当前位置: 首页 > article >正文

基于Docker的Kafka分布式集群

目录

1. 说明

2. 服务器规划

3. docker-compose文件

kafka{i}.yaml

 kafka-ui.yaml

4. kafka-ui配置集群监控

5. 参数表

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py

消费者-异步消费: AsyncKafkaConsumer1.py

7. 参考


1. 说明

  • 创建一个本地开发环境所需的kafka集群
  • 分布在3个虚拟机上,以docker容器方式互联互通

2. 服务器规划

Host端口备注

host001.dev.sb

9092, 9093, 9081

kafka ui 访问

kafka0 节点

host002.dev.sb9092, 9093kafka1 节点
host003.dev.sb9092, 9093kafka2 节点

3. docker-compose文件

kafka{i}.yaml

- 其中 {i} 对应0,1,2

- 用户密码都配在文件里面

services:
  kafka:
    image: 'bitnami/kafka:3.6.2'
    container_name: kafka{i}
    hostname: kafka{i}
    restart: always
    ports:
      - 9092:9092
      - 9093:9093
    environment:
      # KRaft
      - KAFKA_CFG_NODE_ID={i}
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093,1@kafka1:9093,2@kafka2:9093
      - KAFKA_KRAFT_CLUSTER_ID=sbcluster01-mnopqrstuv
      # Listeners
      - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_NUM_PARTITIONS=3
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      # Log
      - KAFKA_CFG_LOG_RETENTION_HOURS = 72
      # SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CONTROLLER_USER=kfkuser
      - KAFKA_CONTROLLER_PASSWORD=youknow
      - KAFKA_INTER_BROKER_USER=kfkuser
      - KAFKA_INTER_BROKER_PASSWORD=youknow
      - KAFKA_CLIENT_USERS=kfkuser
      - KAFKA_CLIENT_PASSWORDS=youknow
      # Others
      - TZ=Asia/Shanghai
    volumes:
      - '/data0/Server/Db/kafka0:/bitnami/kafka'
    extra_hosts: 
      - "kafka0:172.16.20.60"
      - "kafka1:172.16.20.61"
      - "kafka2:172.16.20.62"
 kafka-ui.yaml
services:
  kafka-ui:
    image: 'provectuslabs/kafka-ui:master'
    container_name: kafka-ui
    restart: always
    ports:
      - 9081:8080
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - DYNAMIC_CONFIG_ENABLED=true
      - AUTH_TYPE=LOGIN_FORM
      - SPRING_SECURITY_USER_NAME=admin
      - SPRING_SECURITY_USER_PASSWORD=youknow
    extra_hosts: 
      - "kafka0:172.16.20.60"
      - "kafka1:172.16.20.61"
      - "kafka2:172.16.20.62"

4. kafka-ui配置集群监控

5. 参数表

参数说明
KAFKA_CFG_PROCESS_ROLES

kafka角色,做broker, controller

示例:
KAFKA_CFG_PROCESS_ROLES=controller,broker

KAFKA_KRAFT_CLUSTER_ID集群id, 同属节点需一样
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS投票选举列表
KAFKA_CFG_CONTROLLER_LISTENER_NAMES控制器名称
KAFKA_CFG_NUM_PARTITIONS默认分区数
KAFKA_CFG_LISTENERS监听器的地址和端口
KAFKA_CFG_ADVERTISED_LISTENERS发布监听器的地址和端口
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP监听器的协议 这里sasl_plain表示   仅认证加密 传输不加密
KAFKA_CLIENT_USERS加密客户端账号
KAFKA_CLIENT_PASSWORDS加密客户端密码
#Clustering
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORKafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORKafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRKafka 内部使用的 __transaction_state 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与
leader 保持同步的副本集合
#Log
KAFKA_CFG_LOG_DIRS日志目录
KAFKA_CFG_LOG_RETENTION_HOURS数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理,默认168小时,一周时间

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer
import json


def delivery_report(err, msg):
    """Called once for each message produced to indicate delivery result.
    Triggered by poll() or flush()."""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def create_async_producer(config):
    """Creates an instance of an asynchronous Kafka producer."""
    return Producer(config)


def produce_messages(producer, topic, messages):
    """Asynchronously produces messages to a Kafka topic."""
    for message in messages:
        # Trigger any available delivery report callbacks from previous produce() calls
        producer.poll(0)

        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        producer.produce(
            topic, json.dumps(message).encode("utf-8"), callback=delivery_report
        )

    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    producer.flush()


if __name__ == "__main__":
    # Kafka configuration
    # Replace these with your server's configuration
    conf = {
        "bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092",
        "client.id": "PythonProducer",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": "kfkuser",
        "sasl.password": "youknow",
    }

    # Create an asynchronous Kafka producer
    async_producer = create_async_producer(conf)

    # Messages to send to Kafka
    messages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]

    # Produce messages
    produce_messages(async_producer, "zx001.msg.user", messages_to_send)
消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
import logging
from datetime import datetime

# 设置日志格式,'%()'表示日志参数
log_format = "%(message)s"
logging.basicConfig(
    filename="logs/kafka_messages1.log", format=log_format, level=logging.INFO
)


async def consume_loop(consumer, topics):
    try:
        # 订阅主题
        consumer.subscribe(topics)

        while True:
            # 轮询消息
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print(
                        "%% %s [%d] reached end at offset %d\n"
                        % (msg.topic(), msg.partition(), msg.offset())
                    )
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # 正常消息
                raw_message = msg.value()
                # print(f"Raw message: {raw_message}")
                str_msg = raw_message.decode("utf-8")
                parsed_message = json.loads(str_msg)
                parsed_message["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                print(f"Received message: {type(parsed_message)} : {parsed_message}")
                json_data = json.dumps(parsed_message, ensure_ascii=False)
                logging.info("{}".format(json_data))
            await asyncio.sleep(0.01)  # 小睡片刻,让出控制权
    finally:
        # 关闭消费者
        consumer.close()


async def consume():
    # 消费者配置
    conf = {
        "bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092",
        "group.id": "MsgGroup2",
        "auto.offset.reset": "earliest",
        "client.id" :  "PythonConsumer",
        "security.protocol" :  "SASL_PLAINTEXT",
        "sasl.mechanisms" :  "PLAIN",
        "sasl.username" :  "kfkuser",
        "sasl.password" :  "youknow"
    }

    # 创建消费者
    consumer = Consumer(conf)
    await consume_loop(consumer, ["zx001.msg.user"])


if __name__ == "__main__":
    asyncio.run(consume())

7. 参考

- Apache Kafka® Quick Start - Local Install With Docker

- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub

- https://juejin.cn/post/7187301063832109112


http://www.kler.cn/a/512839.html

相关文章:

  • Taro+Vue实现图片裁剪组件
  • 二十七、资源限制-LimitRange
  • LeetCode:37. 解数独
  • SSE 实践:用 Vue 和 Spring Boot 实现实时数据传输
  • LeetCode hot 力扣热题100 排序链表
  • Vue进阶之旅:核心技术与页面应用实战(路由进阶)
  • leetcode——和为K的子数组(java)
  • 【配置环境】VS Code中JavaScript环境搭建
  • Ubuntu22.04系统切换内核版本
  • 【论文投稿】探秘嵌入式硬件设计:从原理到代码实战
  • 计算机视觉模型的未来:视觉语言模型
  • java快速导出word文档
  • 小结:OSPF协议的工作原理
  • Linux探秘坊-------3.开发工具详解(2)
  • Spring Event和MQ的区别和使用场景
  • Java JDK17 API 离线文档下载
  • 【深度学习项目】语义分割-DeepLab网络(DeepLabV3介绍、基于Pytorch实现DeepLabV3网络)
  • ubuntu下,模仿安装vllm,仅记录
  • android如何将字符串\u83b7\u53d6\u6210\u529f转换成中文
  • Mac安装配置使用nginx的一系列问题
  • 一文大白话讲清楚webpack基本使用——3——图像相关loader的配置和使用
  • 摘录人工智能面试笔试题汇总
  • 文档解析:PDF里的复杂表格、少线表格如何还原?
  • 对人型机器人的研究和展望
  • 提升前端性能的JavaScript技巧:让你的网站飞一般的流畅
  • 【C++指南】类和对象(九):内部类