当前位置: 首页 > article >正文

Python发送带key的kafka消息

在Python中发送带有键(key)的Kafka消息,通常会使用`confluent-kafka`或`kafka-python`这样的库。这里我将分别展示如何使用这两个库来实现这个功能。

 

### 使用 `confluent-kafka`

 

首先,确保你已经安装了`confluent-kafka`库。如果没有安装,可以使用pip进行安装:

```bash

pip install confluent-kafka

```

 

然后,你可以使用以下代码来发送带有键的消息:

```python

from confluent_kafka import Producer

 

def delivery_report(err, msg):

    """ Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush(). """

    if err is not None:

        print(f'Message delivery failed: {err}')

    else:

        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

 

# 配置生产者

conf = {'bootstrap.servers': 'localhost:9092'}

 

# 创建生产者实例

producer = Producer(conf)

 

# 消息的键和值

message_key = 'my_key'

message_value = 'Hello, Kafka!'

 

# 发送消息

producer.produce('my_topic', key=message_key, value=message_value, callback=delivery_report)

 

# 触发所有消息的回调函数

producer.flush()

```

 

### 使用 `kafka-python`

 

同样地,确保你已经安装了`kafka-python`库。如果未安装,可以通过pip安装:

```bash

pip install kafka-python

```

 

接下来,使用以下代码来发送带有键的消息:

```python

from kafka import KafkaProducer

 

# 创建生产者实例

producer = KafkaProducer(bootstrap_servers='localhost:9092',

                         key_serializer=str.encode,

                         value_serializer=str.encode)

 

# 消息的键和值

message_key = 'my_key'

message_value = 'Hello, Kafka!'

 

# 发送消息

producer.send('my_topic', key=message_key, value=message_value)

 

# 确保所有消息都已发送

producer.flush()

 

# 关闭生产者

producer.close()

```

 

在这两个例子中,我们创建了一个Kafka生产者,并指定了一个本地运行的Kafka服务器地址(`localhost:9092`)。然后,我们定义了要发送的消息的键和值,并调用了相应的方法来发送消息。对于`confluent-kafka`,我们还设置了一个回调函数来处理消息的交付结果。

 

请根据你的实际环境调整配置,例如Kafka服务器的地址等。希望这能帮助到你!如果有任何其他问题,请随时提问。


http://www.kler.cn/a/446637.html

相关文章:

  • 杂七杂八的网络安全知识
  • 面试题整理9----谈谈对k8s的理解2
  • DB-GPT 智谱在线模型配置
  • 复习打卡大数据篇——Hadoop HDFS 02
  • 笔记本电脑需要一直插着电源吗?电脑一直充电的利弊介绍
  • oracle 设置归档日志存放路径
  • TCP为什么需要三次握手和四次挥手?
  • 创新性融合丨卡尔曼滤波+目标检测 新突破!
  • C/C++语言基础--C++STL库之仿函数、函数对象、bind、function简介
  • 单元测试(C++)——gmock通用测试模版(个人总结)
  • Spring(三)-SpringWeb-概述、特点、搭建、运行流程、组件、接受请求、获取请求数据、特殊处理、拦截器
  • python实现word转html
  • AI大模型进一步推动了AI在处理图片、视频、音频、文本的等数据应用
  • 【新教程】非root用户给Ubuntu server设置开机自启服务-root用户给Ubuntu server设置开机自启服务
  • ArcGIS计算土地转移矩阵
  • 详细解释爬虫中的异常处理机制?
  • Rabbitmq实现延迟队列
  • Leetcode2545:根据第 K 场考试的分数排序
  • 26、基于SpringBoot的在线文档管理系统的设计与实现
  • R 基础运算
  • 基于卷积神经网络(CNN)和ResNet50的水果与蔬菜图像分类系统
  • 机器视觉检测相机基础知识 | 颜色 | 光源 | 镜头 | 分辨率 / 精度 / 公差
  • Leetcode 串联所有单词的子串
  • 【windows】sonarqube起不来的问题解决
  • 人脸修复与增强腾讯开源项目GFPGAN介绍
  • python rabbitmq实现简单/持久/广播/组播/topic/rpc消息异步发送可配置Django