基于docker部署kafka-3.8.0版本,并开启SASL认证模式
1、下载安装包
(1)https://kafka.apache.org/downloads 下载如下图版本
2、解压安装包
执行tar -xvf kafka_2.13-3.8.0.tgz命令对安装包进行解压。
3、增加配置文件
(1)进入 /kafka_2.13-3.8.0/config 目录
(2)创建kafka_server_jaas.conf 文件 内容如下图
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName = "kafka3.8.0"
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
为broker增加SASL认证信息 用户名/密码均为admin
(3)修改server.properties 文件,增加下面信息,内容如下图
# 安全内部协议,指定使用SASL协议
security.inter.broker.protocol=SASL_PLAINTEXT
# SASL认证机制,默认使用PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
#超级管理员权限用户
super.users=User:admin
#监听
listeners=SASL_PLAINTEXT://localhost:9092
#这个地址是客户端用来连接到Kafka Broker的地址
advertised.listeners=SASL_PLAINTEXT://localhost:9092
(4)创建kafka_consumer_jaas.conf 文件 内容如下图
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
为消费者客户端增加SASL认证信息 对应broker的用户名/密码
(5)创建kafka_producer_jaas.conf 文件 内容如下图
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
为生产者客户端增加SASL认证信息 对应broker的用户名/密码
(6)创建 kafka_topic_jass.properties 文件 内容如下图
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
为topic增加SASL认证信息,指定认证机制为SASL。
(7)修改 producer.properties 文件 在最下面增加如下代码
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
(8)修改 consumer.properties 文件 在最下面增加如下代码
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4、修改启动脚本
(1)修改 kafka-server-start.sh 脚本 在脚本最上方增加如下代码。
export KAFKA_OPTS=“-Djava.security.auth.login.config=/kafka_2.13-3.8.0/config/kafka_server_jaas.conf”
其中 路径为自己kafka的安装路径
(2)修改 kafka-console-producer.sh 脚本 在脚本最上方增加如下代码。
export KAFKA_OPTS=" -Djava.security.auth.login.config=/kafka_2.13-3.8.0/config/kafka_producer_jaas.conf""
其中 路径为自己kafka的安装路径
(2)修改 kafka-console-consumer.sh 脚本 在脚本最上方增加如下代码。
export KAFKA_OPTS=" -Djava.security.auth.login.config=/kafka_2.13-3.8.0/config/kafka_consumer_jaas.conf"
其中 路径为自己kafka的安装路径
5、启动zookeeper服务
(1)执行
./zookeeper-server-start.sh ../config/zookeeper.properties &
命令,后台运行zookeeper
(2)执行
lsof -i:2181
查看zookeeper是否正确启动,如无异常则如下图
6、启动kafka服务
(1)执行 ./kafka-server-start.sh …/config/server.properties & 命令 后台运行
(2)执行 lsof -i:9092 查看kafka是否正常启动,如无异常则如下图
7、创建topic
执行
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic zhangsan --command-config ../config/kafka_topic_jass.properties
创建topic
执行
./kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config ../config/kafka_topic_jass.properties
查询已创建的topic列表
8、启动生产者测试发送数据
执行
./kafka-console-producer.sh --broker-list localhost:9092 --topic zhangsan --producer.config ../config/producer.properties
9、启动消费者测试接收消息
执行
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic zhangsan --consumer.config ../config/consumer.properties
可以看到 成功消费到了消息
可使用 KAFKA-king 测试