当前位置: 首页 > article >正文

Docker部署Kafka SASL_SSL认证,并集成到Spring Boot

1,创建证书和密钥

需要openssl环境,如果是Window下,下载openssl

Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions

还需要keytool环境,此环境是在jdk环境下 

本案例所使用的账号密码均为:

kafka: kafka2024

2, 生成 CA 证书

# 创建CA
openssl req -new -x509 -keyout ca-key.pem -out ca-cert.pem -days 3650 -nodes -passin pass:kafka2024 -passout pass:kafka2024 -subj "/CN=kafka CA"

# 创建Kafka服务器证书请求
keytool -keystore kafka.keystore.jks -validity 3650 -genkey -keyalg RSA -storepass kafka2024 -keypass kafka2024 -dname "CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown"

# 将CA证书导入Kafka服务器的密钥库
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert.pem -storepass kafka2024 -noprompt

# 创建证书签名请求(CSR)
keytool -keystore kafka.keystore.jks -certreq -file cert.csr -storepass kafka2024 -keypass kafka2024


# 使用CA签署证书
openssl x509 -req -CA ca-cert.pem -CAkey ca-key.pem -in cert.csr -out cert.pem -days 3650 -CAcreateserial -passin pass:kafka2024

# 导入签署后的证书到密钥库
keytool -keystore kafka.keystore.jks -import -file cert.pem -storepass kafka2024 -keypass kafka2024 -noprompt

# 创建信任库并导入CA证书
keytool -keystore kafka.truststore.jks -alias CARoot -import -file cert.pem -storepass kafka2024 -noprompt

3,得到kafka.keystore.jks、kafka.truststore.jks:

4,准备好镜像包: 

  1. bitnami/kafka:3.9.0 镜像资源包

 

5,配置文件docker-compose.yml 

注意:将 IP地址改成主机IP 1.14.165.18

version: '3.8'

services:
  kafka:
    image: 'bitnami/kafka:3.9.0'
    container_name: kafka
    ports:
      - '9092:9092'
    environment:
      # KRaft
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
      - KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
      # SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=kafka
      - KAFKA_CONTROLLER_PASSWORD=kafka2024
      - KAFKA_INTER_BROKER_USER=kafka
      - KAFKA_INTER_BROKER_PASSWORD=kafka2024
      - KAFKA_CLIENT_USERS=kafka
      - KAFKA_CLIENT_PASSWORDS=kafka2024
      # SSL
      - KAFKA_TLS_TYPE=JKS
      - KAFKA_CERTIFICATE_PASSWORD=kafka2024
    volumes:
      - './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
      - './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'

6,在当前目录下,准备好如下三个文件:

7,运行以下命令启动Kafka服务:

sudo docker-compose up -d

8,测试验证:

测试发送消息

sudo docker exec -it kafka kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties

 发现有报错:

[2024-11-14 08:41:04,117] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2024-11-14 08:41:04,118] WARN [Producer clientId=console-producer] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-11-14 08:41:04,120] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names present
        at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:383)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169)
        at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209)
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:444)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:533)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:382)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:302)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:563)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:501)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:596)
        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
        at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:569)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:490)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:251)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.security.cert.CertificateException: No subject alternative names present
        at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:142)
        at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:101)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:458)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:432)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:292)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
        at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329)

解决办法:

在容器内配置文件两个文件加上参数algorithm

/opt/bitnami/kafka/config/producer.properties
/opt/bitnami/kafka/config/consumer.properties

ssl.endpoint.identification.algorithm=
producer.ssl.endpoint.identification.algorithm=
consumer.ssl.endpoint.identification.algorithm= 

 

如果容器内没办法编辑,可以先把文件拷贝出来修改,然后再拷贝覆盖

sudo docker cp kafka:/opt/bitnami/kafka/config/consumer.properties consumer.properties
sudo docker cp kafka:/opt/bitnami/kafka/config/producer.properties producer.properties


sudo vi consumer.properties
sudo vi producer.properties

sudo docker cp consumer.properties kafka:/opt/bitnami/kafka/config/consumer.properties
sudo docker cp producer.properties kafka:/opt/bitnami/kafka/config/producer.properties

再次执行发送消息:

sudo docker exec -it kafka kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties

 

另外开一个窗口测试接受消息:

sudo docker exec -it kafka kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties

10,使用Spring Boot 集成Kafka

在Spring Boot应用中配置Kafka客户端以使用SASL_SSL认证。
在pom.xml中添加Kafka客户端依赖。

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

 配置application.yml,修改对应IP地址

spring:
  application:
    name: ncc
  kafka:
    bootstrap-servers: 1.14.165.11:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-512
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka2024";
      ssl.truststore.location: kafka.truststore.jks
      ssl.truststore.password: kafka2024
      ssl.keystore.location: kafka.keystore.jks
      ssl.keystore.password: kafka2024
      ssl.key.password: kafka2024
      ssl.endpoint.identification.algorithm:
      producer.ssl.endpoint.identification.algorithm:
      consumer.ssl.endpoint.identification.algorithm:












并将kafka.keystore.jks 和 kafka.truststore.jks 文件放到当前项目:

11,创建KafkaTest测试类

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest(classes = NccApplication.class)
public class KafkaTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void send() {
        kafkaTemplate.send("test","hhh");
    }

}

测试通过


http://www.kler.cn/a/400044.html

相关文章:

  • 极速入门数模电路
  • 七、箭头函数及简写、arguments、剩余参数、展开运算符、解构数组与对象、数组常见方法(forEach、map、join、reduce)
  • 深入探索Scala迭代器:实用技巧与最佳实践
  • 阿里云引领智算集群网络架构的新一轮变革
  • 【C语言】科技要闻。
  • 学了Arcgis的水文分析——捕捉倾泻点,河流提取与河网分级,3D图层转要素失败的解决方法,测量学综合实习网站存着
  • springMVC 全局异常统一处理
  • 如何把本地docker 镜像下载用到centos系统中呢?
  • C++ cosnt关键字
  • 基于RK3588的移动充电机器人应用解决方案
  • Django Q对象和F表达式
  • 【Apache Paimon】-- 2 -- 核心特性 (0.9.0)
  • 安全,服务器证书和SSL连接
  • 基于 Python Django 的二手房间可视化系统分析
  • 【网络云计算】2024第48周-每日【2024/11/18】小测-实操题--二层交换机配置vlan
  • HTML5实现趣味飞船捡金币小游戏(附源码)
  • RABBIT_MQ消息队列
  • IDEA旗舰版编辑器器快速⼊门(笔记)
  • AI agent智能体的逻辑思维--工作流
  • 【Java】SpringBoot模拟流式输出,前端使用流式接收数据并打印
  • 【数据结构与算法】排序
  • electron客户端预览doc、docx、excel、pdf、ppt、csv、txt等文件类型
  • ASUS/华硕灵耀X双屏Pro UX8402Z 原厂Win11-22H2系统 工厂文件 带ASUS Recovery恢复
  • 【大语言模型】ACL2024论文-16 基于地图制图的罗马尼亚自然语言推理语料库的新型课程学习方法
  • CSS复合选择器详解与应用指南
  • ArcMap 操作矢量要素的旋转、复制等功能