当前位置: 首页 > article >正文

001 Kafka入门及安装

Kafka入门及安装

文章目录

  • Kafka入门及安装
    • 1.介绍
      • Kafka的基本概念和核心组件
    • 2.安装
      • 1.docker快速安装
          • zookeeper安装
          • kafka安装
        • 添加topic
        • 删除topic
        • kafka-ui安装
      • 2.Docker安装(SASL/PLAIN认证配置-用户名密码)

来源参考的deepseek,如有侵权联系立删

1.介绍

Kafka的基本概念和核心组件

Kafka是分布式流处理平台,由 Scala 和 Java 编写。Kafka 的核心概念和组件包括:

  1. Producer:发布消息的客户端,向Broker发送数据。
  2. Consumer:订阅消息的客户端,从Broker读取数据。
  3. Broker:Kafka服务节点,负责消息存储和转发。
  4. Topic:消息的逻辑分类单位,类似数据库表。
  5. Partition:主题的分区,实现并行处理和扩展性。
  6. Consumer Group:多个消费者组成的组,协同消费同一主题的不同分区。

详细介绍

  1. 主题(Topic) :主题是消息的逻辑分类单位,类似于数据库中的表。每个主题可以包含多个分区(Partition),用于实现水平扩展和高吞吐量。
  2. 分区(Partition) :分区是物理上的消息分组,每个分区是一个有序且不可变的消息队列。分区可以分布在不同的服务器上,以实现数据的分布式存储和处理。
  3. Broker(服务器) :Broker 是 Kafka 集群中的节点,负责接收和存储消息。一个集群可以由多个 Broker 组成,每个 Broker 可以管理多个分区和副本。
  4. 生产者(Producer) :生产者是向 Kafka 主题发送消息的客户端。生产者将消息发布到指定的主题中,消息会被路由到相应的分区。
  5. 消费者(Consumer) :消费者是从 Kafka 主题中拉取消息的客户端。消费者可以订阅一个或多个主题,并从这些主题中读取消息。消费者可以组成消费者组(Consumer Group),以实现消息的并行消费和故障恢复。
  6. 消费者组(Consumer Group) :消费者组由多个消费者组成,组内的每个消费者负责消费不同分区的消息,以实现高效的消息分发和容错能力。
  7. ZooKeeper:ZooKeeper 是一个分布式协调服务,用于管理 Kafka 集群的元数据,如主题、分区和副本的配置信息。ZooKeeper 还负责选举和维护集群的领导者。
  8. 日志(Log) :Kafka 使用日志来存储消息,每个主题的日志由多个分区组成,每个分区包含一系列有序的消息。日志可以进行压缩和持久化,以提高存储效率和数据可靠性。
  9. 偏移量(Offset) :偏移量用于跟踪消息的消费位置。消费者通过偏移量来确定从何处继续消费消息。
  10. 复制机制:Kafka 通过复制机制提高容错能力。每个分区可以有多个副本,即使部分 Broker 失效,服务仍可继续运行。

这些组件共同构成了 Kafka 的核心架构,使其能够实现高吞吐量、低延迟、可扩展性和持久化的消息传递和处理。

2.安装

1.docker快速安装

zookeeper安装
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka安装
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper  \
-v /etc/localtime:/etc/localtime  \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
--name kafka: 设置容器的名字为“kafka”。

-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。

--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。

--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。

--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。

--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。

--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。

wurstmeister/kafka: 使用的Docker镜像名字。

/opt/kafka/config/server.properties  #镜像内部配置文件地址
./kafka-server-start.sh -daemon ./config/server.properties  #启动命令
添加topic
kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 --partitions 1 --topic mytest
删除topic
kafka-topics.sh \
  --delete --topic lx \
  --zookeeper 127.0.0.1:2181
kafka-ui安装
docker run --name kafka-ui  -p 19092:8080 \
    -e KAFKA_CLUSTERS_0_NAME=local \
    -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=127.0.0.1:9092 \
    -d provectuslabs/kafka-ui:latest         

2.Docker安装(SASL/PLAIN认证配置-用户名密码)

1.宿主机创建用户密码配置文件

#创建文件
touch   /dockerData/kafka/config/kafka_server_jaas.conf

2.编辑kafka_server_jaas.conf配置

KafkaServer {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="admin"
       password="123456"
       user_admin="123456"
       user_moshangshang="123456";
};                            
user_admin="123456"
user_后面跟的是用户名,然后123456为密码,可配置多个用户密码

3.运行kafka容器

docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper  \
-v /etc/localtime:/etc/localtime  \
-v /dockerData/kafka/config/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf  \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env 192.168.1.250:9092 \
--env KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka

4.宿主机创建kafka的server.properties 配置

5.修改并添加认证相关配置

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.250:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

6.拷贝宿主机配置文件覆盖容器内部原本文件

docker  cp /dockerData/kafka/config/server.properties   kafka:/opt/kafka/config

7.重启容器
8.整合springboot的yml配置

#kafka配置
spring:
  kafka:
    #kafka集群地址
    bootstrap-servers: 192.168.25.100:9092
    producer:
      #批量发送的数据量大小
      batch-size: 1
      #可用发送数量的最大缓存
      buffer-memory: 33554432
      #key序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #达到多少时间后,会发送
      properties:
        linger.ms: 1
        security.protocol: SASL_PLAINTEXT
        sasl.mechanism: PLAIN
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
      #代表集群中从节点都持久化后才认为发送成功
      acks: -1
    consumer:
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
        security.protocol: SASL_PLAINTEXT
        sasl.mechanism: PLAIN
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";
      group-id: test
      auto-offset-reset: earliest
    listener:
      ack-mode: MANUAL # 精准控制offset提交
      concurrency: 3 # 并发消费者数
      type: batch

9.kafka-ui配置认证

docker run --name kafka-ui  -p 19092:8080 \
	-e KAFKA_CLUSTERS_0_NAME=local \
	-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=192.168.1.250:9092 \
	-e KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT \
    -e KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN \
    -e KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='moshangshang' password='123456';" \
	-d provectuslabs/kafka-ui:latest

http://www.kler.cn/a/568055.html

相关文章:

  • DeepSeek本地部署+自主开发对话Web应用
  • 博云先进算力管理平台AIOS已上线全尺寸DeepSeek系列模型
  • 高可用、高性能、负载均衡集群的区别
  • 蓝桥杯 团建
  • 【目录】系统架构设计师—学习笔记
  • ES scroll=1m:表示快照的有效时间为1分钟。怎么理解
  • React 源码揭秘 | Effect更新流程
  • Unity小功能实现:鼠标点击移动物体
  • Spring AI:让AI应用开发更简单
  • 回归预测 | Matlab基于SSA-BiLSTM-Attention的数据多变量回归预测(多输入单输出)
  • AI人工智能机器学习之神经网络
  • springBoot连接远程Redis连接失败(已解决)
  • 最新Git入门到精通完整教程
  • Python办公自动化教程(008):设置excel单元格边框和背景颜色
  • Windows 11 下正确安装 Docker Desktop 到 D 盘的完整教程
  • EasyRTC嵌入式WebRTC技术与AI大模型结合:从ICE框架优化到AI推理
  • 基于 SSM+Vue的 车辆管理系统 系统的设计与实现
  • Brave 132 编译指南 Android 篇 - 配置编译环境 (五)
  • 从JSON过滤到编程范式:深入理解JavaScript数据操作
  • MySQL在线、离线安装