当前位置: 首页 > article >正文

python kafka 发送/接收 消息

首先有安装好的 kafka 环境,点我查看安装教程

环境安装

pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple/

生产者

import json
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errors


def producer_demo():
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'], 
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    
    future = producer.send(
        'mykafka',
        key='creater',  # 同一个key值,会被送至同一个分区
        value="{'creater':'zhangsan', 'date':'2023-04-04'}",
        partition=0)  # 向分区1发送消息
    future.get(timeout=100)

if __name__ == "__main__":
    producer_demo()

消费者

import json
import time
import traceback
from kafka import KafkaConsumer
from kafka.errors import kafka_errors

def consumer_demo():
    consumer = KafkaConsumer(
        'mykafka',
        bootstrap_servers="127.0.0.1:9092",
        auto_offset_reset='earliest'
    )

    for message in consumer:
        print(message)
        print(json.loads(message.value))
        # print(a)

if __name__ == "__main__":
    while True:
        consumer_demo()
        time.sleep(1)

http://www.kler.cn/a/368914.html

相关文章:

  • vue文件转AST,并恢复成vue文件(适用于antdv版本升级)
  • Linux常用命令 yum 命令介绍
  • 【容器】容器化详解:提升开发与运维效率的关键技术
  • MySQL-事务隔离级别
  • 自旋锁原理及基于原子引用手写自旋锁
  • 基于Python大数据的王者荣耀战队数据分析及可视化系统
  • 协议 HTTP
  • WPF+MVVM案例实战(八)- 自定义开关控件封装实现
  • Docker 常用命令全解析:提升对雷池社区版的使用经验
  • 我在1024谈华为
  • SLAM是什么,分类
  • MySQL基础快速复习及高级语法学习
  • Maven入门到进阶:构建、依赖与插件管理详解
  • 在项目中如何实现 Redis 分布式锁?
  • Golang | Leetcode Golang题解之第506题相对名次
  • 【React系列五】—React学习历程的分享
  • C# OOP面试题精选 面向新手/SOLID原则/设计模式++ 长期更新
  • 为什么在网络中不能直接传输数据
  • Linux 文件权限管理:chown、chgrp 和 chmod 的使用及权限掩码规则
  • 高效集成钉钉报销到金蝶付款单的技术实现
  • CSRF 点击劫持
  • 摊牌了,创业失败了
  • python实战(二)——房屋价格回归建模
  • WebSocket学习笔记
  • Kafka之消费者客户端
  • 多模态大模型(MLLM)中的Connector