当前位置: 首页 > article >正文

Python 全栈系列266 Kafka服务的Docker搭建

说明

在大量数据处理任务下的缓存与分发

这个算是来自顾同学的助攻+1,我有点java绝缘体的体质,碰到和java相关的安装部署总会碰到点奇怪的问题,不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素,可以简单认为kafka的速度是10万/秒级的。

本次文章的目的是:

  • 1 搭建一个平时工作中常用的队列服务
  • 2 方便自己或者其他同事再次搭建

内容

1 搭建过程

共要搭建两个服务:zookeeper和kafka。

1.1 创建zookeeper

这个是基础服务,必须要最先启动

docker run -d --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100

通常来说,这个服务启动后就不用管了,但是偶尔如果需要debug的时候:

docker exec -it zookeeper bash
bin/zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids

1.2 创建持久化路径

这个会实际保存kafka的消息

mkdir -p /data/kafka-logs

1.3 创建kafka

一种场景是只监听外网IP(WAN_IP),另一种场景是同时监听内外网(LAN_IP)。

只监听外网的比较简单

WAN_IP=111
LAN_IP=222
docker run -it --rm --name kafka \
 -p 24666:24666 \
 --link zookeeper:zk \
 -e HOST_IP=localhost \
 -e KAFKA_BROKER_ID=1 \
 -e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \
 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${WAN_IP}:24666  \
 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:24666 \
 -e KAFKA_LOG_DIRS=/data/kafka-logs \
 -v /data/kafka-logs:/data/kafka-logs \
 registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

同时监听内外网的比较麻烦(且要求端口不同)

WAN_IP=111
LAN_IP=222
docker run -d --name kafka \
  -p 24666:24666 \
  -p 9092:9092 \
  --link zookeeper:zk \
  -e HOST_IP=localhost \
  -e KAFKA_BROKER_ID=1 \
  -e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \
  -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
  -e KAFKA_LISTENER_NAME=INTERNAL \
  -e KAFKA_LISTENER_NAME=EXTERNAL \
  -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
  -e KAFKA_LOG_DIRS=/data/kafka-logs \
  -v /data/kafka-logs:/data/kafka-logs \
  registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

配置解释
KAFKA_LISTENERS:

  • INTERNAL://0.0.0.0:9092 用于所有网络接口监听。

  • EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。

  • KAFKA_ADVERTISED_LISTENERS:

  • INTERNAL://IP:9092 用于内网客户端。

  • EXTERNAL://IP:24666 用于外网客户端。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:

  • INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。

注释
• docker run -d --name kafka:启动一个名为 kafka 的容器,并在后台运行。
• -p 9092:9092:将主机的 9092 端口映射到容器的 9092 端口,这是 Kafka 的默认端口。
• --link zookeeper:zk:将名为 zookeeper 的容器链接到当前容器,并在当前容器中以 zk 作为别名进行访问。
• -e HOST_IP=localhost:设置环境变量 HOST_IP 为 localhost。
• -e KAFKA_BROKER_ID=1:设置 Kafka 的 broker ID 为 1。【如果有多个,应该在这里区分】
• -e KAFKA_ZOOKEEPER_CONNECT=zk:2181:指定 Zookeeper 的连接地址。
• -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx:9092:设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】
• -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:设置 Kafka 的监听地址。
• -e KAFKA_LOG_DIRS=/data/kafka-logs:指定 Kafka 日志存储目录。
• -v /data/kafka-logs:/data/kafka-logs:将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录,以持久化存储 Kafka 日志。

2 测试

2.1 生产者测试

from pydantic import BaseModel, field_validator
import json 
import pandas as pd 
class KafkaJsonMsgList(BaseModel):
    json_list : list

    @property
    def msg_list(self):
        return pd.Series(self.json_list).apply(json.loads).to_list()
    

from func_timeout import func_set_timeout,FunctionTimedOut

import json
from confluent_kafka import Producer
# @func_set_timeout(60)

def send_messages(bootstrap_servers = None, topic= None, messages= None):
    """
    发送消息到 Kafka 主题

    :param bootstrap_servers: Kafka 服务器地址
    :param topic: Kafka 主题
    :param messages: 要发送的消息列表
    """
    # 创建 Producer 实例
    producer = Producer(**{'bootstrap.servers': bootstrap_servers,'acks': 1 })

    for msg in messages:
        try:
            producer.produce(topic, msg)
        except BufferError:
            # 如果队列已满,等待队列空出空间
            producer.poll(1)
        # 定期调用poll以确保消息传递
        producer.poll(0)

    # 确保所有消息都被发送
    producer.flush()


msg_list = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(3)]
topic = 'my_test6'
# 外网
## bootstrap_servers = 'WAN_IP:24666'
# 内网
bootstrap_servers = 'LAN_IP:9092'

send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list)

2.2 消费者测试

from confluent_kafka import Consumer

# 如果是非json的,直接拿到就可以了
# @func_set_timeout(60)


def consume_messages(config = None, topic = None, max_messages = 3):
    # Create Consumer instance
    consumer = Consumer(config)

    # Subscribe to topic
    consumer.subscribe([topic])
    consumed_count = 0

    res_list = []
    try:
        while consumed_count < max_messages:
            msg = consumer.poll(1.0)
            if msg is None:
                print('Empty Q')
                break 
            else:
                res_list.append(msg.value().decode('utf-8'))
                consumed_count += 1
            if consumed_count >= max_messages:
                break
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()
    return res_list 


# 外网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'WAN_IP:24666',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
# 内网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'LAN_IP:9092',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()

2.3 性能测试

发送端,1.48秒发送10万条消息,稍微弱了点,不过考虑这个是一台仅仅4核8G且繁忙的机器,那就还好(我默认的方式是需要json序列化的)。


tick1 = time.time()
msg_list_10w = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(100000)]
topic = 'my_test6'
send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list_10w)
tick2 = time.time()
print('takes %.2f to send 100000' % (tick2-tick1))
takes 1.48 to send 100000
```

接收端
````python
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100000  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()
print(tick2-tick1, 'get_time')
print(tick3-tick2, 'parse-time')

1.3391587734222412 get_time
0.24841904640197754 parse-time
```

总体上还是满意的,可以了。

http://www.kler.cn/a/286811.html

相关文章:

  • 1.CSS的三大特性
  • iOS开发设计模式篇第二篇MVVM设计模式
  • 联想电脑怎么用u盘装系统_联想电脑用u盘装win10系统教程
  • doris:腾讯云 COS导入数据
  • 图谱之前端关系应用
  • 靶机复现-pikachu靶机文件包含漏洞
  • ctfshow之web58~web71
  • Android --- transaction.commitAllowingStateLoss();和transcation.commit 有什么区别
  • J.U.C Review - volatile / synchronized / 锁 深入剖析
  • Java网络编程概述
  • 【maven】阿里云和apache仓库配置
  • Java 流过滤器是否足够智能,可以忽略有序流中不必要的项目吗?
  • 云计算实训40——部署project_exam_system项目及容器的编排
  • c++ 原型模式
  • 论文速读|通过人类远程操作的深度模仿学习框架:人型机器人的行走操纵技能
  • 【Pytorch】模型权重保存与上传
  • C#上位机采用数据库操作方式对Excel或WPS表格进行读取操作
  • 分布式系统中的Dapper与Twitter Zipkin:链路追踪技术的实现与应用
  • Ai产品经理的探索:技能、机遇与未来展望
  • 支付平台构建支付接口供整个公司调用—支付代理商
  • Git 学习
  • QT Sql 实现多个股票成交明细数据文件制成数据库并支持查询
  • Node原子计数器
  • 数据库性能测试2:内存数据库
  • 基于 Android Studio 实现的 记账本-MySQL版
  • [C#]国密SM2算法加解密字符串加密解密文件