Есть такой log collector vector. Посмотрим как можно им собирать логи с подов, но не просто так, а по наличию на namespace специального лейбла.
Я использую fluxcd для бутстрапа k8s кластеров + их наполнения всяким.
На уровне apps/vector описываем следующие манифесты.
kustomization.yaml — основной файл, куда подключаем используемые ресурсы.
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- repository.yaml
- helmrelease.yaml
- podlogs-cm.yaml
- metrics-cm.yaml
Сразу скажу, что создания namespace тут нет, оно вынесено отдельно на уровень конкретного кластера, так проще управлять объектами ns-а (квоты и так далее).
repository.yaml — откуда будем забирать чарт. Описываем сущность HelmRepository.
apiVersion: source.toolkit.fluxcd.io/v1
kind: HelmRepository
metadata:
name: vector
namespace: flux-system
spec:
interval: 30m
url: https://helm.vector.dev
helmrelease.yaml — тут описание релиза. Контексто зависимые переменные будут переопределены на уровне кластера.
apiVersion: helm.toolkit.fluxcd.io/v2beta1
kind: HelmRelease
metadata:
name: vector
namespace: logging
spec:
releaseName: vector
chart:
spec:
chart: vector
sourceRef:
kind: HelmRepository
name: vector
namespace: flux-system
version: "0.33.0"
interval: 1h0m0s
install:
remediation:
retries: 3
values:
role: "Agent"
podAnnotations: {}
podLabels:
vector.dev/exclude: "false"
command: []
image:
repository: timberio/vector
args:
- --config-dir
- "/etc/vector/"
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 8
memory: 8000Mi #need much RAM on start
dataDir: "/vector-data-dir"
existingConfigMaps:
- vector-config-pod-logs
- vector-config-metrics
containerPorts:
- name: api
containerPort: 8686
protocol: TCP
- name: int-metrics
containerPort: 9598
protocol: TCP
service:
enabled: true
ports:
- name: api
port: 8686
- name: int-metrics
port: 9598
podMonitor:
enabled: false
env:
- name: VECTOR_LOG
value: info
- name: RUST_BACKTRACE
value: "1"
- name: "VECTOR_THREADS"
value: "8"
- name: ALLOCATION_TRACING
value: "false"
- name: VECTOR_REQUIRE_HEALTHY
value: "true"
- name: VECTOR_KAFKA_LOGIN
valueFrom:
secretKeyRef:
name: vector
key: VECTOR_KAFKA_LOGIN
- name: VECTOR_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: vector
key: VECTOR_KAFKA_PASSWORD
livenessProbe:
httpGet:
path: /health
port: api
readinessProbe:
httpGet:
path: /health
port: api
У меня вектор пишет данные в кафку, поэтому добавляем переменных для него + по «дефолту» подключаем только конфиги для метрики и сбора логов подов, без аудит логов.
podlogs-cm.yaml — конфиг мапа для сбора логов с подов.
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-config-pod-logs
namespace: logging
data:
pod-logs.yaml: |
api:
enabled: true
address: "0.0.0.0:8686"
data_dir: /vector-data-dir
expire_metrics_secs: 40
healthchecks:
enabled: true
sources:
metrics:
type: internal_metrics
scrape_interval_secs: 25
kubernetes_pods_logs:
type: kubernetes_logs
max_line_bytes: 10485760
max_read_bytes: 10485760
glob_minimum_cooldown_ms: 3000
ignore_older_secs: 600
delay_deletion_ms: 60000
read_from: "end"
oldest_first: false
exclude_paths_glob_patterns:
- "**/*.gz"
- "**/*.tmp"
auto_partial_merge: true
node_annotation_fields:
node_labels: ""
pod_annotation_fields:
pod_annotations: ""
transforms:
metrics_relabel_pod:
type: remap
inputs: [metrics]
source: |-
.tags.host = get_env_var!("VECTOR_SELF_NODE_NAME")
drop_invalid_messages:
type: filter
inputs: [kubernetes_pods_logs]
condition: |-
(.kubernetes.pod_name != null) && (.kubernetes.container_name != null) && (strip_whitespace!(.message) != "")
dedot_transform:
type: remap
inputs: [drop_invalid_messages]
source: |-
.kubernetes.cluster = "${CLUSTER_NAME}"
.kubernetes.namespace = del(.kubernetes.pod_namespace)
.kubernetes.node = del(.kubernetes.pod_node_name)
. = map_keys(., recursive: true) -> |key| { replace(key, ".", "_") }
separate_json:
type: route
inputs: [dedot_transform]
route:
json_logs: .kubernetes.pod_labels.log_format == "json"
multiline_parser:
type: reduce
inputs: [separate_json._unmatched]
group_by:
- kubernetes.cluster
- kubernetes.pod_namespace
- kubernetes.pod_name
- kubernetes.container_name
max_events: 100
starts_when:
type: "vrl"
source: match(string!(.message), r'^[^\s]')
merge_strategies:
message: concat_newline
json_parser:
type: remap
drop_on_abort: true
drop_on_error: true
reroute_dropped: true
inputs: [separate_json.json_logs]
source: |-
parsed = parse_json!(del(.message))
.message_json = parsed
separate_by_destination:
type: route
inputs:
- multiline_parser
- json_parser
- json_parser.dropped
route:
<route_name>: to_bool!(.kubernetes.namespace_labels."<label_name>")
throttle_logs:
type: throttle
inputs: [separate_by_destination._unmatched]
internal_metrics:
emit_events_discarded_per_key: true
exclude:
type: "vrl"
source: 'to_bool!(.kubernetes.pod_labels."vector_dev/throttle_free")'
key_field: .kubernetes.namespace
threshold: 20000
window_secs: 10
final_transform_for_kafka_sink:
type: remap
drop_on_abort: true
drop_on_error: true
reroute_dropped: true
inputs: [separate_by_destination.<route_name>]
source: |-
del(.kubernetes.namespace_labels)
final_transform_for_default_sink:
type: remap
drop_on_abort: true
drop_on_error: true
reroute_dropped: true
inputs: [throttle_logs]
source: |-
del(.kubernetes.namespace_labels)
sinks:
kafka_sink:
type: kafka
inputs: [final_transform_for_kafka_sink]
bootstrap_servers: "<bootstrap_address>"
topic: "<topic_name>"
encoding:
codec: json
librdkafka_options:
security.protocol: "SASL_SSL"
enable.ssl.certificate.verification: "false"
sasl:
enabled: true
password: ${VECTOR_KAFKA_PASSWORD}
username: ${VECTOR_KAFKA_LOGIN}
mechanism: "PLAIN"
Если будете использовать у себя, не забудьте поправить <> placeholder-ы. Немного о том, что происходит в этом трасформе.
Логи подов (kubernetes_pods_logs) имею лимиты по 10МБ (max_line_bytes и max_read_bytes). Не читаем старше 600 сек, ignore_older_secs параметр. Читаем с конца, параметр read_from. Исключаем архивные и .tmp файлы. В метрики добавляем имя хоста. Вектор имет предопределенные набор лейблов на метрики, который лежит в .tags. Мы можем переопределить существующие или добавить новые. Потом, когда будем их скрайпить, мы увидим в prometheus/grafana то, что положили в .tags.
drop_invalid_messages — на вход подаем kubernetes_pods_logs. Пускаем в обработку дальше, только если есть .kubernetes.pod_name, .kubernetes.container_name и .message не пустой после strip_whitespace.
dedot_transform — добавляем поле с названием кластера .kubernetes.cluster = «${CLUSTER_NAME}». Переменную для него мы позже опишем на уровне кластера в patch-values. Самое важное тут map_keys. «Раздотирование» всех ключей рекурсивно, т.е. точки в названиях полей заменяются на подчёркивания.
Далее мы делим логи на JSON (separate_json route) и обычные (multiline_parser). JSON логи идут в json_parser, остальное в _unmatched.
Главное это separate_by_destination — тут происходит сбор логов по заданному на namespace лейблу. NS-ы подходящие под это пойдут в синк kafka_sink. Все остальное идет в throttle_logs, которые после уходят в никуда, то есть события эти мы нигде не увидим.
metrics-cm.yaml — конфигмапа для сбора метрик.
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-config-metrics
namespace: logging
data:
metrics.yaml: |
sources:
vector_metrics:
type: internal_metrics
scrape_interval_secs: 25
transforms:
metrics_relabel:
type: remap
inputs:
- vector_metrics
source: |-
.tags.host = get_env_var!("VECTOR_SELF_NODE_NAME")
.tags.cluster = get_env_var!("CLUSTER_NAME")
sinks:
vector_metrics_to_prometheus:
type: prometheus_exporter
inputs:
- metrics_relabel
address: 0.0.0.0:9598
flush_period_secs: 60
Выше в парсинге, мы добавляли поле с именем k8s кластера в логи. Тут мы добавляем это поле в метрики.
Далее уровень конкретного кластера.
У меня в структуре это clusters/<cluster_name>/apps/vector.
kustomization.yaml.
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../../../../apps/vector
- secret.yaml
patches:
- path: patch-values.yaml
target:
kind: HelmRelease
name: vector
namespace: logging
patch-values.yaml — тут переопределения под конкретный кластер.
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: vector
namespace: logging
spec:
values:
existingConfigMaps:
- vector-config-pod-logs
- vector-config-metrics
env:
- name: CLUSTER_NAME
value: <cluster_name>
- name: VECTOR_LOG
value: info
- name: RUST_BACKTRACE
value: "1"
- name: "VECTOR_THREADS"
value: "8"
- name: ALLOCATION_TRACING
value: "false"
- name: VECTOR_REQUIRE_HEALTHY
value: "true"
- name: VECTOR_KAFKA_LOGIN
valueFrom:
secretKeyRef:
name: vector
key: VECTOR_KAFKA_LOGIN
- name: VECTOR_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: vector
key: VECTOR_KAFKA_PASSWORD
Секрет описывать не буду, тут для его хранения в репе лучше использовать или интеграцию с волт или sops для шифрования.
По итогу после развертывания, vector будет работать как daemonsets, логи будут идти в кафку только с тех подов, которые размещены в ns у которого есть соответствующий label.