使用fluentd收集kubernetes日志并推送到ES

使用fluentd收集kubernetes日志并推送到ES

这篇文章使用fluentd官方提供的kubernetes部署方案daemonset来部署日志收集, 参考项目地址:

本文使用的kubernetes版本为: 1.22.8

使用fluentd镜像为: fluent/fluentd-kubernetes-daemonset:v1.15.2-debian-elasticsearch7-1.0

请注意下文配置中<var>标记, 需要根据需求自行替换.

创建命名空间 #

本项目所有的资源创建在logging下, 先创建它:

1
2
NAMESPACE=logging
kubectl create ns $NAMESPACE

先创建服务账号 #

创建服务账号并赋予集群查看的权限, 使用下面的命令:

1
2
3
4
5
6
kubectl -n $NAMESPACE create -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
EOF

创建绑定关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
kubectl create -f - <<EOF
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: view
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: ${NAMESPACE}
EOF

创建配置文件 #

配置文件使用configmap挂在在容器内, 覆盖容器内现有的配置文件. 根据实际需要创建配置文件. 官方镜像提供个默认配置之外, 我有以下额外的需求,

  • 根据用户为pod配置的标签collect-logs: true来判断是否收集该容器日志, 没有配置该标签不会收集.
  • 需要将日志输出到elasticsearch中, 同时要保证日志的唯一, 不能同一条日志在elasticsearch中存在多条.
  • 推送到ES中的日志都带有集群标识, 方便检索.

好, 开始创建, 先创建一个文件:

1
vi /tmp/fluent.conf

写入下面的内容, 这里的内容大部分是官方镜像中拷贝过来的, 我在其基础之上做了一些修改以满足我的需求:

注意: CLUSTER_ID表示为集群的唯一标识,请替换!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# AUTOMATICALLY GENERATED
# DO NOT EDIT THIS FILE DIRECTLY, USE /templates/conf/fluent.conf.erb

@include "#{ENV['FLUENTD_SYSTEMD_CONF'] || 'systemd'}.conf"
@include "#{ENV['FLUENTD_PROMETHEUS_CONF'] || 'prometheus'}.conf"
@include kubernetes.conf
@include conf.d/*.conf

<match kubernetes.**>
  @type rewrite_tag_filter
  <rule>
    key $.kubernetes.labels.collect-logs
    pattern /^true$/
    tag collect-logs.CLUSTER_ID
  </rule>
</match>

<filter collect-logs.CLUSTER_ID>
  @type elasticsearch_genid
  hash_id_key _hash
</filter>

<match collect-logs.CLUSTER_ID>
   @type elasticsearch
   @id out_es
   @log_level info
   include_tag_key true
   hosts "#{ENV['FLUENT_ELASTICSEARCH_HOSTS']}"
   host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
   port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
   path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
   scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
   ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
   ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1_2'}"
   user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
   password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
   reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
   reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
   reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
   log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}"
   logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
   logstash_dateformat "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_DATEFORMAT'] || '%Y.%m.%d'}"
   logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'true'}"
   index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
   target_index_key "#{ENV['FLUENT_ELASTICSEARCH_TARGET_INDEX_KEY'] || use_nil}"
   type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
   include_timestamp "#{ENV['FLUENT_ELASTICSEARCH_INCLUDE_TIMESTAMP'] || 'false'}"
   template_name "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_NAME'] || use_nil}"
   template_file "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_FILE'] || use_nil}"
   template_overwrite "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_OVERWRITE'] || use_default}"
   sniffer_class_name "#{ENV['FLUENT_SNIFFER_CLASS_NAME'] || 'Fluent::Plugin::ElasticsearchSimpleSniffer'}"
   request_timeout "#{ENV['FLUENT_ELASTICSEARCH_REQUEST_TIMEOUT'] || '5s'}"
   application_name "#{ENV['FLUENT_ELASTICSEARCH_APPLICATION_NAME'] || use_default}"
   suppress_type_name "#{ENV['FLUENT_ELASTICSEARCH_SUPPRESS_TYPE_NAME'] || 'true'}"
   enable_ilm "#{ENV['FLUENT_ELASTICSEARCH_ENABLE_ILM'] || 'false'}"
   ilm_policy_id "#{ENV['FLUENT_ELASTICSEARCH_ILM_POLICY_ID'] || use_default}"
   ilm_policy "#{ENV['FLUENT_ELASTICSEARCH_ILM_POLICY'] || use_default}"
   ilm_policy_overwrite "#{ENV['FLUENT_ELASTICSEARCH_ILM_POLICY_OVERWRITE'] || 'false'}"
   id_key _hash
   remove_keys _hash
   <buffer>
     @type file
     path /var/log/fluentd/buffers-es
     flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
     flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
     chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
     retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
     retry_forever true
     overflow_action "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_OVERFLOW_ACTION'] || 'block'}"
   </buffer>
</match>

生成configmap:

1
kubectl -n $NAMESPACE create configmap fluentd-conf --from-file=fluent.conf=/tmp/fluent.conf

创建daemonset部署 #

先创建deployment部署文件:

1
vi /tmp/deployment.yaml

将下面的内容拷贝进去,之后:wq:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  labels:
    k8s-app: fluentd-logging
    version: v1
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
      - key: node-role.kubernetes.io/control-plane
        effect: NoSchedule
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.15.2-debian-elasticsearch7-1.0
        env:
          - name: K8S_NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
          - name:  FLUENT_ELASTICSEARCH_HOSTS
            value: "<es1-host>:<es1-port>,<es2-host>:<es2-port>,<es3-host>:<es3-port>"
          # or use
          # - name:  FLUENT_ELASTICSEARCH_HOST
          #   value: "<es-host>"
          # - name:  FLUENT_ELASTICSEARCH_PORT
          #   value: "<es-port>"
          - name: FLUENT_ELASTICSEARCH_USER
            value: "<es-user>"
          - name: FLUENT_ELASTICSEARCH_PASSWORD
            value: "<es-password>"
          - name: FLUENT_ELASTICSEARCH_LOG_ES_400_REASON
            value: "true"
          # when log formt is not json, unconmment
          # - name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
          #   value: "/^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/"
          # - name: FLUENT_CONTAINER_TAIL_PARSER_TIME_FORMAT
          #   value: "%Y-%m-%dT%H:%M:%S.%N%:z"
        resources:
          limits:
            memory: 600Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        # When actual pod logs in /var/lib/docker/containers, the following lines should be used.
        # - name: dockercontainerlogdirectory
        #   mountPath: /var/lib/docker/containers
        #   readOnly: true
        - name: config-volume
          mountPath: /fluentd/etc/fluent.conf
          subPath: fluent.conf
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      # When actual pod logs in /var/lib/docker/containers, the following lines should be used.
      # - name: dockercontainerlogdirectory
      #   hostPath:
      #     path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-conf
1
kubectl -n $NAMESPACE apply -f /tmp/deployment.yaml