当前位置: 首页 > article >正文

Python连接Kafka收发数据等操作

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time

# Kafka服务器地址
bootstrap_servers = ['localhost:9092']

# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# 发送消息的函数
def send_message(topic, message):
    # 将消息转换为字节
    producer.send(topic, json.dumps(message).encode('utf-8'))
    producer.flush()

if __name__ == '__main__':
    # 创建'test'主题
    topic = 'test'
    # 发送消息
    i = 1
    while True:
        message = {'num': i, 'msg': f'Hello Kafka {i}'}
        send_message(topic, message)
        i += 1
        time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json

# Kafka服务器地址
bootstrap_servers = ['localhost:9092']

# 创建KafkaConsumer实例
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='latest',  # 从最新的消息开始消费
    # auto_offset_reset='earliest',  # 从最早的offset开始消费
    enable_auto_commit=True,  # 自动提交offset
    group_id='my-group'  # 消费者组ID
)

# 消费消息
for message in consumer:
    # 将接收到的消息解码并转换为字典
    message = json.loads(message.value.decode('utf-8'))
    print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic

# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)


# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)

# 删除主题
admin_client.delete_topics(topics=['test-topic'])


http://www.kler.cn/a/324279.html

相关文章:

  • 前后端请求响应
  • 《生成式 AI》课程 第3講 CODE TASK 任务3:自定义任务的机器人
  • 基于opencv制作GUI界面
  • 海康大华宇视视频平台EasyCVR私有化视频平台服务器选购主要参数有哪些?
  • 8.C++面向对象5(实现一个较为完善的日期类)
  • 【PYTORCH】使用MTCNN和InceptionResnetV1简单进行人脸检测和相似度匹配
  • [leetcode]53_最大子数组(序列)和
  • 2024年空间计算/XR的现状:双子座阶段的探索与展望
  • 关于电商API接口接入及其相关应用分析【主流电商API接口】
  • 音视频处理工具FFmpeg与Java结合的简单使用
  • 【计算机网络 - 基础问题】每日 3 题(二十七)
  • Stable Diffusion绘画 | Checkpoint Merger 模型融合
  • 如何区分这个ip是真实ip,不是虚假的ip
  • 论文阅读 - SWATTING Spambots: Real-time Detection of Malicious Bots on X
  • RabbitMQ的高级特性-延迟队列
  • 个人计算机与网络的安全
  • 初探shell与bash使用指南
  • spring cloud Gateway网关
  • 网络编程(12)——完善粘包处理操作(id字段)
  • 【最新】微信小程序连接onenet——stm32+esp8266+onenet实现查看温湿度,控制单片机
  • 探索CefSharp,Cefsharp浏览器能做自动填表和模拟登录
  • 长芯微LPQ76930锂电池组保护芯片完全P2P替代BQ76930
  • 江协科技STM32学习- P20 实验-TIM编码器接口测速
  • windows安装Redis以后配置远程访问
  • 深度学习框架的选择:深入比较PyTorch与TensorFlow
  • Type-C接口桌面显示器的优势