当前位置: 首页 > article >正文

深入Kafka KRaft模式:生产环境配置详解

kafka介绍

什么是 Kafka

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

Kafka 的基本术语

  • 消息 (Message) :Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
  • 批次 (Batch) :为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。
  • 主题 (Topic) :消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
  • 分区 (Partition) :主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

  • 生产者 (Producer) :向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。
  • 消费者 (Consumer) :订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
  • 消费者组 (Consumer Group) :生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

  • 偏移量 (Offset) :是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
  • 节点( broker): 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
  • 副本 (Replica) :Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
  • 重平衡(Rebalance)。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的特性

高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。

高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。

持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。

容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作

高并发:支持数千个客户端同时读写

Kafka 的使用场景

活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。

传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。

度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

流式处理:流式处理是有一个能够提供多种应用程序的领域。

限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka 系统架构

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

核心 API

Kafka 有四个核心API,它们分别是

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

Kafka 为何快

Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。

批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,更多关于磁盘寻址的了解,请参阅 程序员需要了解的硬核知识之磁盘 。

总结一下其实就是四个要点

  • 顺序读写
  • 零拷贝
  • 消息压缩
  • 分批发送

KRaft模式介绍

架构介绍

Kafka的KRaft模式是一种新的元数据管理方式,旨在去除对ZooKeeper的依赖,使Kafka成为一个完全自包含的系统。在Kafka的传统模式下,元数据管理依赖于ZooKeeper,这增加了部署和运维的复杂性。为了解决这个问题,Kafka社区引入了KRaft模式。在KRaft模式下,所有的元数据,包括主题、分区信息、副本位置等,都被存储在Kafka集群内部的特殊日志中。这个日志使用Raft协议来保证一致性。

在传统架构中,Kafka集群包含多个 Broker 节点和一个ZooKeeper 集群。Kafka 集群的 Controller 在被选中后,会从 ZooKeeper 中加载它的状态。并且通知其他Broker发生变更,如 Leaderanddis r和 Updatemetdata 请求。

在新的架构中,三个 Controller 节点替代三个ZooKeeper节点。Controller节点和 Broker 节点运行在不同的进程中。Controller 节点中会选举出一个 Leader 角色。并且Leader 不会主动向 Broker 推送更新,而是由 Broker 拉取元数据信息。

注意:Controller 进程与 Broker 进程在逻辑上是分离的,同时允许部分或所有 Controller 进程和 Broker 进程是同一个进程,即一个Broker节点即是Broker也是Controller。

优点

简化部署:不再需要单独部署和维护ZooKeeper集群,降低了运维复杂性和成本。

一致性和可靠性:Raft协议提供了强一致性保证,确保元数据在多个节点之间的一致复制,提高了系统的可靠性。

高可用性:通过控制节点的多数共识机制,在少数节点故障的情况下仍能保证集群的正常运行。

性能优化:减少了Kafka与ZooKeeper之间的通信开销,可能带来性能上的提升。

集群部署

集群规划

一般模式下,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。kraft 模式架构下,不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。

ip主机名角色node id
192.168.10.31kafka-1Broker,Controller1
192.168.10.32kafka-2Broker,Controller2
192.168.10.33kafka-3Broker,Controller3

准备工作

以下操作在所有 kafka 节点执行

  1. 下载软件包

下载地址:https://kafka.apache.org/downloads

root@kafka-1:~# wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
  1. 解压
root@kafka-1:~# tar -zxf kafka_2.13-3.9.0.tgz
root@kafka-1:~# mv kafka_2.13-3.9.0 /opt/kafka
root@kafka-1:~# cd /opt/kafka/
root@kafka-1:/opt/kafka# ls
bin  config  libs  LICENSE  licenses  NOTICE  site-docs
  1. 安装 java 环境
root@kafka-1:/opt/kafka# apt install openjdk-21-jdk -y
root@kafka-1:/opt/kafka# java -version
openjdk version "21.0.5" 2024-10-15
OpenJDK Runtime Environment (build 21.0.5+11-Ubuntu-1ubuntu120.04)
OpenJDK 64-Bit Server VM (build 21.0.5+11-Ubuntu-1ubuntu120.04, mixed mode, sharing)
  1. 创建 kafka 数据目录
root@kafka-1:/opt/kafka# mkdir -p /data/kafka/

修改配置文件

kafka 侦听器类型

PLAINTEXT:用于不加密的普通通信。 listeners=PLAINTEXT://:9092
SSL:用于加密通信,确保数据传输的安全性。 listeners=SSL://:9093
SASL_PLAINTEXT:在不加密的基础上,添加身份验证机制。listeners=SASL_PLAINTEXT://:9094
SASL_SSL:结合加密和身份验证,确保通信的机密性和完整性。listeners=SASL_SSL://:9095
CONTROLLER:用于 Kafka 集群控制器进行内部通信,管理 Broker 状态。listeners=CONTROLLER://:9096
EXTERNAL:专为外部客户端访问设计,通常用于跨网络的通信。listeners=EXTERNAL://:9097

kafka-1 配置

root@kafka-1:/opt/kafka# vim /opt/kafka/config/kraft/server.properties
# 节点ID,集群内唯一
node.id=1 
# 集群地址信息
controller.quorum.voters=1@192.168.10.31:9093,2@192.168.10.32:9093,3@192.168.10.33:9093
# 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
advertised.listeners=PLAINTEXT://192.168.10.31:9092,CONTROLLER://192.168.10.31:9093
# kafka数据目录
log.dirs=/data/kafka

kafka-2 配置

root@kafka-2:/opt/kafka# vim config/kraft/server.properties
# 节点ID,集群内唯一
node.id=2
# 集群地址信息
controller.quorum.voters=1@192.168.10.31:9093,2@192.168.10.32:9093,3@192.168.10.33:9093
# 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
advertised.listeners=PLAINTEXT://192.168.10.32:9092,CONTROLLER://192.168.10.32:9093
# kafka数据目录
log.dirs=/data/kafka

kafka-3 配置

root@kafka-3:/opt/kafka# vim config/kraft/server.properties
# 节点ID,集群内唯一
node.id=3
# 集群地址信息
controller.quorum.voters=1@192.168.10.31:9093,2@192.168.10.32:9093,3@192.168.10.33:9093
# 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
advertised.listeners=PLAINTEXT://192.168.10.33:9092,CONTROLLER://192.168.10.33:9093
# kafka数据目录
log.dirs=/data/kafka

初始化集群

生成存储目录唯一ID

root@kafka-1:/opt/kafka# bin/kafka-storage.sh random-uuid
1pW25KKcSUmTTXCS8H3qsQ

格式化 kafka 存储目录(每个节点都需要执行)

root@kafka-1:/opt/kafka# bin/kafka-storage.sh format -t 1pW25KKcSUmTTXCS8H3qsQ -c /opt/kafka/config/kraft/server.properties
Formatting metadata directory /tmp/kraft-combined-logs with metadata.version 3.9-IV0.

启动集群

每个节点都执行启动服务命令

root@kafka-1:/opt/kafka# /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties

查看服务日志

root@kafka-1:/opt/kafka# tail -f /opt/kafka/logs/server.log

集群验证

查看 kafka 节点状态

root@kafka-1:/opt/kafka# bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092

查看 topic 信息

root@kafka-1:/opt/kafka# bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

systemd 管理服务

  1. 创建服务文件
root@kafka-1:/opt/kafka# vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (KRaft mode)
Documentation=https://kafka.apache.org/documentation/
After=network.target

[Service]
Type=simple
User=root
Group=root
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

# 设置 JMX 配置(可选)
Environment=KAFKA_OPTS="-Djava.rmi.server.hostname=192.168.10.31 -Dcom.sun.management.jmxremote.port=9997 -Dcom.sun.management.jmxremote.rmi.port=9997 -Dcom.sun.management.jmxremote. -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

# 设置 Kafka 日志输出(可选)
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=kafka

[Install]
WantedBy=multi-user.target
  1. 重新启动 kafka 服务
root@kafka-1:~# systemctl daemon-reload 
root@kafka-1:~# systemctl enable kafka
Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
root@kafka-1:~# systemctl restart kafka
root@kafka-1:~# systemctl status kafka

部署Kafka-ui

Kraft 模式的 kafka 集群管理工具推荐使用 kafka-ui。

下载 jar 包

下载地址:https://github.com/provectus/kafka-ui/releases

root@kafka-1:~# mkdir /opt/kafka-ui
root@kafka-1:~# cd /opt/kafka-ui/
root@kafka-1:/opt/kafka-ui# wget https://github.com/provectus/kafka-ui/releases/download/v0.7.2/kafka-ui-api-v0.7.2.jar
root@kafka-1:/opt/kafka-ui# ls
kafka-ui-api-v0.7.2.jar

创建配置文件

root@kafka-1:/opt/kafka-ui# cat > config.yml << EOF
kafka:
  clusters:
    -
      name: kafka-cluster
      bootstrapServers: http://192.168.10.31:9092,http://192.168.10.32:9092,http://192.168.10.33:9092
      metrics:
        port: 9997
        type: JMX
EOF

配置文件具体可参考样例配置:https://github.com/provectus/kafka-ui/blob/master/documentation/compose/kafka-ui.yaml。

需要注意的是样例文件为环境变量导入配置,如果需要转为配置文件,可使用此工具:https://env.simplestep.ca/进行转换。

启动服务

root@kafka-1:/opt/kafka-ui# nohup java -Dspring.config.additional-location=/opt/kafka-ui/config.yml -jar /opt/kafka-ui/kafka-ui-api-v0.7.2.jar &

访问验证

使用 systemd 管理服务

root@kafka-1:/opt/kafka-ui# vim /etc/systemd/system/kafka-ui.service
[Unit]
Description=Kafka UI Service
After=network.target

[Service]
ExecStart=/usr/lib/jvm/java-21-openjdk-amd64/bin/java -Dspring.config.additional-location=/opt/kafka-ui/config.yml -jar /opt/kafka-ui/kafka-ui-api-v0.7.2.jar
User=root
Group=root
WorkingDirectory=/opt/kafka-ui
Restart=always
Environment=JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64  # 根据实际环境设置 JAVA_HOME
Environment=PATH=$PATH:/usr/lib/jvm/java-21-openjdk-amd64/bin

[Install]
WantedBy=multi-user.target
root@kafka-1:/opt/kafka-ui# systemctl daemon-reload 
root@kafka-1:/opt/kafka-ui# systemctl enable kafka-ui
root@kafka-1:/opt/kafka-ui# systemctl restart kafka-ui

Kafka基本使用

查看Broker情况

root@kafka-1:/opt/kafka# bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
192.168.10.32:9092 (id: 2 rack: null) -> (
        ……
)
192.168.10.33:9092 (id: 3 rack: null) -> (
        ……
)
192.168.10.31:9092 (id: 1 rack: null) -> (
        ……
)

测试创建topic

root@kafka-1:/opt/kafka# bin/kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --bootstrap-server 127.0.0.1:9092
Created topic test.

查看topic 的情况

root@kafka-1:/opt/kafka# bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092
Topic: test     TopicId: SOYQa_56REWM9mt1vdmj7Q PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 2       Replicas: 2,3   Isr: 2,3        Elr:    LastKnownElr: 
        Topic: test     Partition: 1    Leader: 3       Replicas: 3,1   Isr: 3,1        Elr:    LastKnownElr:
        Topic: test     Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr:

生产者发送消息

root@kafka-1:/opt/kafka# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>hello kafka

消费者消费消息

root@kafka-1:/opt/kafka# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
hello kafka

删除 topic

root@kafka-1:/opt/kafka# bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic test

TLS加密

准备证书

仅在其中一个节点操作既可。

root@kafka-1:/opt/kafka# mkdir /opt/kafka/pki
root@kafka-1:/opt/kafka# cd /opt/kafka/pki/
# 生成 CA 证书
root@kafka-1:/opt/kafka/pki# openssl req -x509 -nodes -days 3650 -newkey rsa:4096 -keyout ca.key -out ca.crt -subj "/CN=Kafka-CA"
Generating a RSA private key
...................................................++++
..................................................................................................................................................++++
writing new private key to 'ca.key'
# 生成私钥
root@kafka-1:/opt/kafka/pki# openssl genrsa -out kafka.key 4096
Generating RSA private key, 4096 bit long modulus (2 primes)
..............................................................................................................++++
........................................++++
e is 65537 (0x010001)
# 生成证书签名请求 (CSR)
root@kafka-1:/opt/kafka/pki# openssl req -new -key kafka.key -out kafka.csr -subj "/CN=kafka-cluster"
# 创建包含所有节点的SAN 配置文件
root@kafka-1:/opt/kafka/pki# cat > san.cnf << EOF
[ req ]
distinguished_name = req_distinguished_name
req_extensions = req_ext
prompt = no

[ req_distinguished_name ]
CN = kafka-cluster

[ req_ext ]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names

[ alt_names ]
# 节点主机名与ip
DNS.1 = kafka-1
DNS.2 = kafka-2
DNS.3 = kafka-3
IP.1 = 192.168.10.31
IP.2 = 192.168.10.32
IP.3 = 192.168.10.33
EOF
# 签署证书
root@kafka-1:/opt/kafka/pki# openssl x509 -req -in kafka.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out kafka.crt \
  -days 3650 -extfile san.cnf -extensions req_ext
Signature ok
subject=CN = kafka-cluster
Getting CA Private Key
# 验证证书
root@kafka-1:/opt/kafka/pki# openssl x509 -in kafka.crt -text -noout | grep -A 1 "Subject Alternative Name"
            X509v3 Subject Alternative Name: 
                DNS:kafka-1, DNS:kafka-2, DNS:kafka-3, IP Address:192.168.10.31, IP Address:192.168.10.32, IP Address:192.168.10.33
root@kafka-1:/opt/kafka/pki# ls -l
total 28
-rw-r--r-- 1 root root 1805 Jan 15 17:54 ca.crt
-rw------- 1 root root 3272 Jan 15 17:54 ca.key
-rw-r--r-- 1 root root   41 Jan 15 17:54 ca.srl
-rw-r--r-- 1 root root 1777 Jan 15 17:54 kafka.crt
-rw-r--r-- 1 root root 1590 Jan 15 17:49 kafka.csr
-rw------- 1 root root 3247 Jan 15 17:49 kafka.key
-rw-r--r-- 1 root root  259 Jan 15 17:51 san.cnf

创建 Keystore

将证书和私钥转换为 PKCS12 文件

root@kafka-1:/opt/kafka/pki# openssl pkcs12 -export -in kafka.crt -inkey kafka.key -out kafka.p12 -name kafka-cert -CAfile ca.crt -caname root -passout pass:123.com

使用 keytoolkafka.p12 文件导入到 Keystore:

root@kafka-1:/opt/kafka/pki# keytool -importkeystore \
  -deststorepass 123.com \
  -destkeypass 123.com\
  -destkeystore kafka.keystore.jks \
  -srckeystore kafka.p12 \
  -srcstoretype PKCS12 \
  -srcstorepass 123.com \
  -alias kafka-cert
Importing keystore kafka.p12 to kafka.keystore.jks...
root@kafka-1:/opt/kafka/pki# ls -l
total 44
-rw-r--r-- 1 root root 1805 Jan 15 18:56 ca.crt
-rw------- 1 root root 3272 Jan 15 18:56 ca.key
-rw-r--r-- 1 root root   41 Jan 15 18:57 ca.srl
-rw-r--r-- 1 root root 1777 Jan 15 18:57 kafka.crt
-rw-r--r-- 1 root root 1590 Jan 15 18:57 kafka.csr
-rw------- 1 root root 3243 Jan 15 18:57 kafka.key
-rw-r--r-- 1 root root 4288 Jan 15 18:58 kafka.keystore.jks
-rw------- 1 root root 4098 Jan 15 18:58 kafka.p12
-rw-r--r-- 1 root root  303 Jan 15 18:57 san.cnf

创建 Truststore

使用 keytool 创建 Truststore 并导入 CA 证书:

root@kafka-1:/opt/kafka/pki# keytool -import \
  -file ca.crt \
  -keystore kafka.truststore.jks \
  -storepass 123.com \
  -alias root
Trust this certificate? [no]:  yes
Certificate was added to keystore
root@kafka-1:/opt/kafka/pki# ls -l
total 48
-rw-r--r-- 1 root root 1805 Jan 15 18:56 ca.crt
-rw------- 1 root root 3272 Jan 15 18:56 ca.key
-rw-r--r-- 1 root root   41 Jan 15 18:57 ca.srl
-rw-r--r-- 1 root root 1777 Jan 15 18:57 kafka.crt
-rw-r--r-- 1 root root 1590 Jan 15 18:57 kafka.csr
-rw------- 1 root root 3243 Jan 15 18:57 kafka.key
-rw-r--r-- 1 root root 4288 Jan 15 18:58 kafka.keystore.jks
-rw------- 1 root root 4098 Jan 15 18:58 kafka.p12
-rw-r--r-- 1 root root 1654 Jan 15 18:58 kafka.truststore.jks
-rw-r--r-- 1 root root  303 Jan 15 18:57 san.cnf

分发文件

将kafka.truststore.jks 和kafka.keystore.jks 文件分发到其他 kafka 节点

root@kafka-1:/opt/kafka/pki# scp kafka.truststore.jks 192.168.10.32:/opt/kafka/pki/
root@kafka-1:/opt/kafka/pki# scp kafka.keystore.jks  192.168.10.32:/opt/kafka/pki/
root@kafka-1:/opt/kafka/pki# scp kafka.truststore.jks 192.168.10.33:/opt/kafka/pki/
root@kafka-1:/opt/kafka/pki# scp kafka.keystore.jks  192.168.10.33:/opt/kafka/pki/

Kafka服务端配置 TLS

在 Kafka KRaft 模式下的 server.properties 文件中,添加以下配置:

root@kafka-1:/opt/kafka/pki# vim /opt/kafka/config/kraft/server.properties
# 修改SSL配置
listeners=SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SSL
advertised.listeners=SSL://192.168.10.31:9092,CONTROLLER://192.168.10.31:9093

# 新增Keystore配置
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.key.password=123.com
# 新增Truststore配置
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
# 客户端连接时启用ssl
ssl.client.auth=required

重启 kafka

root@kafka-1:/opt/kafka/pki# systemctl restart kafka

客户端配置 TLS

创建客户端配置文件,指定证书信息 admin.properties文件内容如下:

root@kafka-1:/opt/kafka# cat > /opt/kafka/config/admin.properties << EOF
security.protocol=SSL
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
ssl.endpoint.identification.algorithm=
ssl.key.password=123.com
EOF

连接 kafka 集群测试

# 查看节点信息
root@kafka-1:/opt/kafka# bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.10.31:9092 --command-config /opt/kafka/config/admin.properties 
192.168.10.32:9092 (id: 2 rack: null) -> (
……
)
192.168.10.31:9092 (id: 1 rack: null) -> (
……
)
192.168.10.33:9092 (id: 3 rack: null) -> (
……
)
# 查看topic信息
root@kafka-1:/opt/kafka# bin/kafka-topics.sh --describe --bootstrap-server 192.168.10.31:9092 --command-config /opt/kafka/config/admin.properties
Topic: test     TopicId: RhzPTC_eRL-etwrn3p5z-g PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 3       Replicas: 3,1   Isr: 3,1        Elr:    LastKnownElr: 
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr:
        Topic: test     Partition: 2    Leader: 2       Replicas: 2,3   Isr: 3,2        Elr:    LastKnownElr:

生产消费消息测试

# 生产者发送消息
root@kafka-2:/opt/kafka# bin/kafka-console-producer.sh --bootstrap-server 192.168.10.31:9092 --topic test --producer.config /opt/kafka/config/admin.properties
>hello tls
# 消费者接收消息
root@kafka-3:/opt/kafka# bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.31:9092 --topic test --from-beginning --consumer.config /opt/kafka/config/admin.properties
hello tls

kafka-ui 配置 TLS

修改 kafka-ui 配置文件

root@kafka-1:/opt/kafka-ui# cat > config.yml << EOF
kafka:
  clusters:
    -
      name: kafka-cluster
      bootstrapServers: 192.168.10.31:9092,192.168.10.32:9092,192.168.10.33:9092
      metrics:
        port: 9997
        type: JMX
      properties:
        security:
          protocol: SSL
        ssl:
          keystore:
            location: /opt/kafka/pki/kafka.keystore.jks
            password: 123.com
        ssl_endpoint_identification_algorithm: ''
      ssl:
        truststorelocation: /opt/kafka/pki/kafka.truststore.jks
        truststorepassword: 123.com
EOF

重启 kafka-ui 并验证

root@kafka-1:/opt/kafka-ui# systemctl restart kafka-ui

PLAIN认证

在Kafka中,SASL(Simple Authentication and Security Layer)机制包括三种常见的身份验证方式:

  1. SASL/PLAIN认证:含义是简单身份验证和授权层应用程序接口,PLAIN认证是其中一种最简单的用户名、密码认证方式,生产环境使用维护简单易用。可用于Kafka和其他应用程序之间的认证。
  2. SASL/SCRAM认证:SCRAM-SHA-256、SCRAM-SHA-512方式认证,本认证需要客户端、服务器共同协同完成认证过程,使用和维护上较为复杂。优势是可动态增加用户,而不必重启kafka组件服务端。
  3. SASL/GSSAPI 认证:Kerberos认证,本认证适用于大型公司企业生产环境,通常结合Kerberos协议使用。使用Kerberos认证可集成目录服务,比如AD。通过本认证机制可实现优秀的安全性和良好的用户体验。

创建Kraft账号密码认证文件

以下操作在所有节点执行。

创建两个用户,分别为admin、test(此处仅用于演示,实际生产环境建议按业务创建多个不同的账号,并配置对指定 topic 的读写权限)

root@kafka-1:~
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="password"
    user_admin="password"
    user_test="test";
};
EOF

该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。

usemame和password指定该代理与集群其他代理初始化连接的用户名和密码

suer_admin=“password”,这个表示一个用户名为admin用户,密码是password,这个必须要有一个,且要这一个跟上面的username和password保持一致。

user_test=“test” 是第二个用户,表示的是用户名为test的账户,密码为test。

修改 kafka 配置文件

Kafka broker 的 server.properties 配置文件,来启用 SASL/PLAIN 认证。以下是需要配置的参数

root@kafka-1:~# vim /opt/kafka/config/kraft/server.properties
# 修改以下配置
listeners=SASL_SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_SSL
advertised.listeners=SASL_SSL://192.168.10.31:9092,CONTROLLER://192.168.10.31:9093
# 节点间CONTROLLER映射为SASL_PLAINTEXT认证
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 新增以下配置
# 设置 SASL 认证机制
sasl.enabled.mechanisms=PLAIN
# 集群间认证时用的认证方式
sasl.mechanism.inter.broker.protocol=PLAIN
# 指定Kafka 客户端与 Broker 之间使用的 SASL 认证机制
sasl.mechanism=PLAIN
# 指定控制器通信时使用的认证机制
sasl.mechanism.controller.protocol=PLAIN
# 配置 SASL 认证存储方式为文件
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 设置必须授权才能用
allow.everyone.if.no.acl.found=false
# 配置超级用户
super.users=User:admin

修改启动脚本

root@kafka-1:~# vim /opt/kafka/bin/kafka-server-start.sh
# 新增-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
fi

重启 kafka 集群

root@kafka-1:~# systemctl restart kafka

客户端使用账户密码认证

  1. 修改客户端配置文件,新增认证信息
root@kafka-1:~# cat > /opt/kafka/config/admin.properties << EOF
bootstrap.servers=192.168.10.31:9092,192.168.10.32:9092,192.168.10.33:9092
ssl.keystore.location=/opt/kafka/pki/kafka.keystore.jks
ssl.keystore.password=123.com
ssl.truststore.location=/opt/kafka/pki/kafka.truststore.jks
ssl.truststore.password=123.com
ssl.endpoint.identification.algorithm=
ssl.key.password=123.com
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="password";
EOF
  1. 查看 boorker 信息
root@kafka-1:~# /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.10.31:9092 --command-config /opt/kafka/config/admin.properties
192.168.10.32:9092 (id: 2 rack: null) -> (
        ……
)
192.168.10.33:9092 (id: 3 rack: null) -> (
        ……
)
192.168.10.31:9092 (id: 1 rack: null) -> (
        ……
)
  1. 查看 topic 信息
root@kafka-1:~# /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.10.31:9092 --list --command-config /opt/kafka/config/admin.properties
.properties
__consumer_offsets
test
  1. 创建客户端认证文件
root@kafka-1:~# cat > /opt/kafka/config/client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="password";
};
EOF
  1. 修改生产者和消费者脚本,添加-Djava.security.auth.login.config=/opt/kafka/config/client_jaas.conf
root@kafka-1:~# vim /opt/kafka/bin/kafka-console-producer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka/config/client_jaas.conf kafka.tools.ConsoleProducer "$@"
root@kafka-1:~# vim /opt/kafka/bin/kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka/config/client_jaas.conf org.apache.kafka.tools.consumer.ConsoleConsumer "$@"
  1. 生产者发送消息
root@kafka-1:~# /opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.31:9092 --topic test --producer.config /opt/kafka/config/admin.properties
> hello kafka
  1. 消费者消费消息
root@kafka-1:~# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.31:9092 --topic test --consumer.config /opt/kafka/config/admin.properties --from-beginning
hello kafka

kafka-ui 使用账号密码认证

  1. 更新 kafka-ui 配置文件
root@kafka-1:~# vim /opt/kafka-ui/config.yml
kafka:
  clusters:
    -
      name: kafka-cluster
      bootstrapServers: 192.168.10.31:9092,192.168.10.32:9092,192.168.10.33:9092
      metrics:
        port: 9997
        type: JMX
      properties:
        security:
          protocol: SASL_SSL
        sasl:
          mechanism: PLAIN
          jaas:
            config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password";
        ssl:
          keystore:
            location: /opt/kafka/pki/kafka.keystore.jks
            password: 123.com
        ssl_endpoint_identification_algorithm: ''
      ssl:
        truststorelocation: /opt/kafka/pki/kafka.truststore.jks
        truststorepassword: 123.com
  1. 重启 kafka-ui 验证
root@kafka-1:~# systemctl restart kafka-ui

ACL权限

在Kafka中,ACL(Access Control List)是用来控制谁可以访问Kafka资源(如主题、消费者组等)的权限机制。ACL配置基于Kafka的kafka-acls.sh工具,能够管理对资源的读取、写入等操作权限。

Kafka ACL的基本概念

Kafka的ACL是基于以下几个方面的:

  • 资源类型(Resource Type): Kafka支持多种资源类型,包括主题(Topic)、消费者组(Consumer Group)、Kafka集群本身(Cluster)等。
  • 操作类型(Operation Type): 如Read(读取)、Write(写入)、Create(创建)、Describe(描述)、Alter(修改)等。
  • 权限类型(Permission Type): Allow表示允许访问,Deny表示拒绝访问。
  • 主体(Principal): 访问Kafka的用户或客户端。Kafka支持通过SASL认证系统中的用户来定义主体,通常是User:<username>的形式。

查看现有ACL

root@kafka-1:/opt/kafka# bin/kafka-acls.sh --bootstrap-server 192.168.10.31:9092 --list --command-config /opt/kafka/config/admin.properties

添加ACL

给用户User:test添加对test主题的读取权限:

root@kafka-1:/opt/kafka# bin/kafka-acls.sh --bootstrap-server 192.168.10.31:9092 --add --allow-principal User:test --operation Read --topic test --command-config /opt/kafka/config/admin.properties
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
        (principal=User:test, host=*, operation=READ, permissionType=ALLOW)
  • –allow-principal: 允许访问的用户主体。
  • –operation: 操作类型,如ReadWrite等。
  • –topic top 名称。

通过 kafka-ui 查看验证

删除ACL

删除User:testtest主题的读取权限:

root@kafka-1:/opt/kafka# bin/kafka-acls.sh --bootstrap-server 192.168.10.31:9092 --remove --allow-principal User:test --operation Read --topic test --command-config /opt/kafka/config/admin.properties
Are you sure you want to remove ACLs: 
        (principal=User:test, host=*, operation=READ, permissionType=ALLOW) 
 from resource filter `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`? (y/n)
y

Kafka性能测试

kafka 不同的参数配置对 kafka 性能都会造成影响,通常情况下集群性能受分区、磁盘和线程等影响因素,因此需要进行性能测试,找出集群性能瓶颈和最佳参数。

测试工具

在 Kafka 安装目录 $KAFKA_HOME/bin/ 有以下跟性能相关的测试脚本:

# 生产者和消费者的性能测试工具
kafka-producer-perf-test.sh
kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh:用于测试Kafka Producer的性能,主要输出4项指标,总共发送消息量(以MB为单位),每秒发送消息量(MB/second),发送消息总数,每秒发送消息数(records/second)。

kafka-consumer-perf-test.sh:用于测试Kafka Consumer的性能,测试指标与Producer性能测试脚本一样

测试环境

  • 前置条件:3 个Broker(节点),1个Topic(主题),3个Partition(分区),1 个 Replication(副本),异步模式,消息Payload为300字节,消息数量 5000万,kafka 版本为 3.9.2
  • 硬件配置:4 核 CPU,8G 内存,1T HDD 硬盘
  • 测试工具:Kafka自带的基准工具

生产者基准测试

root@kafka-1:/opt/kafka# bin/kafka-producer-perf-test.sh \
  --topic perf-test \
  --num-records 5000000 \
  --record-size 300 \
  --throughput -1 \
  --producer.config /opt/kafka/config/admin.properties \
  --print-metrics
1030901 records sent, 206180.2 records/sec (58.99 MB/sec), 436.0 ms avg latency, 915.0 ms max latency.
2392728 records sent, 478545.6 records/sec (136.91 MB/sec), 220.9 ms avg latency, 411.0 ms max latency.
5000000 records sent, 387176.707449 records/sec (110.77 MB/sec), 256.22 ms avg latency, 915.00 ms max latency, 222 ms 50th, 495 ms 95th, 765 ms 99th, 879 ms 99.9th.

Metric Name                                                                                                      Value
app-info:commit-id:{client-id=perf-producer-client}                                                            : a60e31147e6b01ee
app-info:start-time-ms:{client-id=perf-producer-client}                                                        : 1737285501613
app-info:version:{client-id=perf-producer-client}                                                              : 3.9.0
kafka-metrics-count:count:{client-id=perf-producer-client}                                                     : 149.000
producer-metrics:batch-size-avg:{client-id=perf-producer-client}                                               : 16128.974
producer-metrics:batch-size-max:{client-id=perf-producer-client}                                               : 16129.000
……

–topic 指定topic

–num-records 指定生产数据量

–throughput 指定吞吐量(-1表示无限制)

–record-size record数据大小

–producer.config 指定 kafka 客户端配置文件路径

–print-metrics 打印结果指标值

消费者基准测试

root@kafka-1:/opt/kafka# bin/kafka-consumer-perf-test.sh \
  --topic perf-test \
  --messages 50000000 \
  --consumer.config /opt/kafka/config/admin.properties \
  --bootstrap-server 192.168.10.31:9092,192.168.10.32:9092,192.168.10.33:9092 \
  --print-metrics
2025-01-19 19:23:35:491, 2025-01-19 19:23:53:343, 1430.5115, 80.1317, 5000000, 280080.6632, 3539, 14313, 99.9449, 349332.7744

Metric Name                                                                                                            Value
consumer-coordinator-metrics:assigned-partitions:{client-id=perf-consumer-client}                                    : 0.000
consumer-coordinator-metrics:commit-latency-avg:{client-id=perf-consumer-client}                                     : 11.333
consumer-coordinator-metrics:commit-latency-max:{client-id=perf-consumer-client}                                     : 29.000
……

查看更多

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn


http://www.kler.cn/a/510670.html

相关文章:

  • ARM学习(42)CortexM3/M4 MPU配置
  • 总结3..
  • 三电平空间矢量详解
  • 基于32QAM的载波同步和定时同步性能仿真,包括Costas环的gardner环
  • DLNA库Platinum新增安卓64位so编译方法
  • 微透镜阵列精准全检,白光干涉3D自动量测方案提效70%
  • docker中常用的镜像和容器命令
  • day01_项目介绍和环境搭建
  • 新星杯-ESP32智能硬件开发--ESP32的I/O组成-系统中断矩阵
  • Ubuntu 22.04虚拟机安装配置调整(语言输入法字体共享剪切板等等
  • 第6章 ThreadGroup详细讲解(Java高并发编程详解:多线程与系统设计)
  • DDD - 微服务落地的技术实践
  • python 入门
  • 【Linux系统环境中使用二进制包安装Apache】
  • MySQL 创建数据库问题:You have an error in your SQL syntax(MySQL 数据库命名规则问题)
  • 闭包的理解及应用
  • # Rust Actix Web 入门指南
  • Avalonia系列文章之小试牛刀
  • 栈和队列经典例题
  • Git版本控制 – 创建和维护项目Repository
  • 数据结构漫游记:队列的动态模拟实现(C语言)
  • Python基础06(字符串格式化/操作方法)
  • Node.js 到底是什么
  • 微服务学习-OpenFeign 简化服务间调用
  • 【第二十周】U-Net:用于生物图像分割的卷积神经网络
  • ARM GCC编译器