使用Fluent-bit将容器标准输入和输出的日志发送到Kafka
什么是fluent-bit?
Fluent Bit 是一款开源的轻量级日志处理器与转发器,专为嵌入式系统、容器化环境及分布式架构设计。其核心功能包括日志收集、过滤、聚合和传输,支持多种输入源(如文件、系统日志、HTTP接口)和输出目标(如Elasticsearch、Kafka、云存储服务)
工作流程
日志通过数据管道从数据源发送到目的地,一个数据管道可以由input,paser,filter,buffer,routing,output等组成。
-
input插件:用于从数据源抽取数据,一个数据管道中可以包含多个input
-
parser组件:负责将input抽取的非结构化数据转化为标准的结构化数据,每个input均可以定义自己的parser
-
filter插件:负责对格式化数据进行过滤和修改。一个数据管道中可以包含多个filter,多个filter执行顺序与配置文件中的顺序一致
-
buffer组件:用户缓存经过filter处理的数据,默认情况下buffer把input插件的数据缓存到内存中,直到路由传递到output为止
-
routing组件:将buffer中缓存的数据路由到不同的output
-
output插件:负责将数据发送到不同的目的地,一个数据管道中可以包含多个output
部署kafka集群
安装JDK
[root@k8s-master ~]# java -version
java version "21.0.6" 2025-01-21 LTS
Java(TM) SE Runtime Environment (build 21.0.6+8-LTS-188)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.6+8-LTS-188, mixed mode, sharing)
部署kafka
[root@k8s-master kafaka-zookeeper]# tar -xvf kafka_2.12-3.8.0.tgz
[root@k8s-master kafaka-zookeeper]# mkdir -p /home/kafaka-zookeeper/kafka_2.12-3.8.0/{data,logs}
[root@k8s-master kafaka-zookeeper]# cd /home/kafaka-zookeeper/kafka_2.12-3.8.0/config/
[root@k8s-master config]# cp server.properties server.properties.bak
[root@k8s-master config]# sed -i "/#/d" server.properties
[root@k8s-master config]# ll
total 80
-rw-r--r-- 1 root root 906 Jul 23 2024 connect-console-sink.properties
-rw-r--r-- 1 root root 909 Jul 23 2024 connect-console-source.properties
-rw-r--r-- 1 root root 5475 Jul 23 2024 connect-distributed.properties
-rw-r--r-- 1 root root 883 Jul 23 2024 connect-file-sink.properties
-rw-r--r-- 1 root root 881 Jul 23 2024 connect-file-source.properties
-rw-r--r-- 1 root root 2063 Jul 23 2024 connect-log4j.properties
-rw-r--r-- 1 root root 2540 Jul 23 2024 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2262 Jul 23 2024 connect-standalone.properties
-rw-r--r-- 1 root root 1221 Jul 23 2024 consumer.properties
drwxr-xr-x 2 root root 4096 Jul 23 2024 kraft
-rw-r--r-- 1 root root 4917 Jul 23 2024 log4j.properties
-rw-r--r-- 1 root root 2065 Jul 23 2024 producer.properties
-rw-r--r-- 1 root root 544 Mar 14 14:27 server.properties
-rw-r--r-- 1 root root 6896 Mar 14 14:27 server.properties.bak
-rw-r--r-- 1 root root 1094 Jul 23 2024 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Jul 23 2024 trogdor.conf
-rw-r--r-- 1 root root 1205 Jul 23 2024 zookeeper.properties
[root@k8s-master config]# vim server.properties
[root@k8s-master config]# cd ..
[root@k8s-master kafka_2.12-3.8.0]# cd bin/
[root@k8s-master bin]# nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
[1] 16270
部署zookeeper
[root@k8s-node2 ~]# mkdir -p /home/kafaka-zookeeper
[root@k8s-node2 ~]# cd /home/kafaka-zookeeper/
[root@k8s-node2 kafaka-zookeeper]# mv apache-zookeeper-3.8.4-bin apache-zookeeper-3.8.4
[root@k8s-node2 kafaka-zookeeper]# mkdir -p /home/kafaka-zookeeper/apache-zookeeper-3.8.4/{data,logs}
[root@k8s-node2 kafaka-zookeeper]# cd /home/kafaka-zookeeper/apache-zookeeper-3.8.4/conf/
[root@k8s-node2 conf]# mv zoo_sample.cfg zoo.cfg
[root@k8s-node2 data]# /home/kafaka-zookeeper/apache-zookeeper-3.8.4/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/kafaka-zookeeper/apache-zookeeper-3.8.4/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 11484.
配置时间同步:
sudo ntpdate ntp.aliyun.com
使用k8s部署fluent-bit
fluent-bit-config.yaml
[root@k8s-master ~]# cat fluent-bit-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: kube-system
data:
fluent-bit.conf: |
[SERVICE]
Flush 5
Log_Level info
Daemon off
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser docker
Tag k8s.*
Refresh_Interval 5
Mem_Buf_Limit 5MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match k8s.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix k8s.
Merge_Log On
Keep_Log Off
[OUTPUT]
Name kafka
Match *
Brokers 192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092
Topics k8s-logs
rdkafka.compression.codec snappy
parsers.conf: |
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%LZ
Decode_Field_As escaped_utf8 log
fluent-bit-daemonset.yaml
[root@k8s-master ~]# cat fluent-bit-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluent-bit
namespace: kube-system
labels:
app: fluent-bit
spec:
selector:
matchLabels:
app: fluent-bit
template:
metadata:
labels:
app: fluent-bit
spec:
serviceAccountName: fluent-bit
tolerations: # 允许在所有节点(包括 Master)部署
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: fluent-bit
image: fluent/fluent-bit:1.9.10
volumeMounts:
- name: varlog
mountPath: /var/log
- name: dockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluent-bit-config
mountPath: /fluent-bit/etc/
volumes:
- name: varlog
hostPath:
path: /var/log
- name: dockercontainers
hostPath:
path: /var/lib/docker/containers # 如果使用 Containerd,路径为 /var/log/pods
- name: fluent-bit-config
configMap:
name: fluent-bit-config
---
# 创建 RBAC 权限
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluent-bit
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluent-bit-read
rules:
- apiGroups: [""]
resources: ["pods", "namespaces"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluent-bit-read
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluent-bit-read
subjects:
- kind: ServiceAccount
name: fluent-bit
namespace: kube-system
[root@k8s-master ~]# kubectl get pod -n kube-system | grep flu
fluent-bit-4rvfx 1/1 Running 0 31d
fluent-bit-lqhcm 1/1 Running 0 31d
fluent-bit-r6xrh 1/1 Running 0 31d
查看pod日志检查是否连接成功:
[root@k8s-master bin]# kubectl logs fluent-bit-lqhcm -n kube-system
Fluent Bit v1.9.10
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2025/02/11 08:58:15] [ info] [fluent bit] version=1.9.10, commit=760956f50c, pid=1
[2025/02/11 08:58:15] [ info] [storage] version=1.3.0, type=memory-only, sync=normal, checksum=disabled, max_chunks_up=128
[2025/02/11 08:58:15] [ info] [cmetrics] version=0.3.7
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] https=1 host=kubernetes.default.svc port=443
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] token updated
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] local POD info OK
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] testing connectivity with API server...
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] connectivity OK
[2025/02/11 08:58:15] [ info] [output:kafka:kafka.0] brokers='192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092' topics='k8s-logs'
[2025/02/11 08:58:15] [ info] [sp] stream processor started
测试:
在生产者主机上发布消息:
[root@k8s-master bin]# echo "test message" | /home/kafaka-zookeeper/kafka_2.12-3.8.0/bin/kafka-console-producer.sh --broker-list 192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092 --topic k8s-logs
在消费者主机进行查看:
/home/kafaka-zookeeper/kafka_2.12-3.8.0/bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092 \
--topic k8s-logs \
--from-beginning
可以看到容器的日志被输出到kafka当中: