当前位置: 首页 > article >正文

python kafka 发送/接收 消息

首先有安装好的 kafka 环境,点我查看安装教程

环境安装

pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple/

生产者

import json
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errors


def producer_demo():
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    
    future = producer.send(
        'mykafka',
        key='creater',  # 同一个key值,会被送至同一个分区
        value="{'creater':'zhangsan', 'date':'2023-04-04'}",
        partition=0)  # 向分区1发送消息
    future.get(timeout=100)

if __name__ == "__main__":
    producer_demo()

消费者

import json
import time
import traceback
from kafka import KafkaConsumer
from kafka.errors import kafka_errors

def consumer_demo():
    consumer = KafkaConsumer(
        'mykafka',
        bootstrap_servers="127.0.0.1:9092",
        auto_offset_reset='earliest'
    )

    for message in consumer:
        print(message)
        print(json.loads(message.value))
        # print(a)

if __name__ == "__main__":
    while True:
        consumer_demo()
        time.sleep(1)

http://www.kler.cn/news/368914.html

相关文章:

  • Ceph 存储系统全解
  • 探秘 Feign 核心注解:@FeignClient 和 @EnableFeignClients 是如何打通微服务通信的 “任督二脉” 的?
  • Vue笔记-element ui中关于table的前端分页
  • CSS伪元素以及伪类和CSS特性
  • Python 判断键是否存在字典中(新手入门、实战案例)
  • 尽管加密货币被禁,中国仍是比特币挖矿巨头!不过主导地位正在转向美国?
  • 协议 HTTP
  • WPF+MVVM案例实战(八)- 自定义开关控件封装实现
  • Docker 常用命令全解析:提升对雷池社区版的使用经验
  • 我在1024谈华为
  • SLAM是什么,分类
  • MySQL基础快速复习及高级语法学习
  • Maven入门到进阶:构建、依赖与插件管理详解
  • 在项目中如何实现 Redis 分布式锁?
  • Golang | Leetcode Golang题解之第506题相对名次
  • 【React系列五】—React学习历程的分享
  • C# OOP面试题精选 面向新手/SOLID原则/设计模式++ 长期更新
  • 为什么在网络中不能直接传输数据
  • Linux 文件权限管理:chown、chgrp 和 chmod 的使用及权限掩码规则
  • 高效集成钉钉报销到金蝶付款单的技术实现
  • CSRF 点击劫持
  • 摊牌了,创业失败了
  • python实战(二)——房屋价格回归建模
  • WebSocket学习笔记
  • Kafka之消费者客户端
  • 多模态大模型(MLLM)中的Connector