当前位置: 首页 > article >正文

使用python操作kafka

第一步:安装kafka的模块

pip install kafka-python

第二步:编写代码

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import time

class StationLog:
    def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):
        self.station_id = station_id
        self.call_out = call_out
        self.call_in = call_in
        self.call_status = call_status
        self.timestamp = timestamp
        self.call_duration = call_duration

    def to_string(self):
        return json.dumps(self.__dict__)

def main():
    # 设置连接kafka集群的ip和端口
    producer = KafkaProducer(bootstrap_servers='bigdata01:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))

    arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]

    while True:
        call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)
        call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)
        call_status = random.choice(arr)
        call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0

        # 随机产生一条基站日志数据
        station_log = StationLog(
            "station_" + str(random.randint(0, 9)),
            call_out,
            call_in,
            call_status,
            int(time.time() * 1000),  # 当前时间戳
            call_duration
        )
        print(station_log.to_string())
        time.sleep(0.1 + random.randint(0, 99) / 100)

        try:
            # 发送数据到Kafka
            producer.send('topicA', station_log.to_string())
        except KafkaError as e:
            print(f"Failed to send message: {e}")

        # 确保所有异步消息都被发送
        producer.flush()

if __name__ == "__main__":
    main()

以上案例是通过python操作kafka,将一些模拟数据发送到kafka中。


http://www.kler.cn/a/398871.html

相关文章:

  • 问题分析与解决:Android开机卡动画问题分析
  • 二、神经网络基础与搭建
  • react中如何在一张图片上加一个灰色蒙层,并添加事件?
  • 技术理论||02空中三角测量
  • ReactPress与WordPress:两大开源发布平台的对比与选择
  • 如何编译 Cesium 源码
  • 天空地一体化立体感知智慧环保解决方案
  • 【C】文件的写入与读取
  • Python中的TCP
  • 鸿蒙Navigation入门使用
  • 【java】链表:找到成环的起始节点
  • git,ssh免密公钥配置,gitee为例,GitHub,gitlab同理
  • uniapp如何i18n国际化
  • 【flutter】flutter2升级到3.
  • 【Go 开发】pprof 排查问题流程:排查程序 CPU 占用高的问题
  • 跨平台WPF框架Avalonia教程 五
  • 【Java豆瓣电影爬虫】——抓取电影详情和电影短评数据 -
  • Gin 框架中间件详细介绍
  • 解析煤矿一张图
  • 【专题】计算机网络之网络层
  • c ++零基础可视化——数组
  • C++中的桥接模式
  • 为什么要使用Ansible实现Linux管理自动化?
  • uniapp微信小程序接入airkiss插件进行WIFI配网
  • ODOO学习笔记(7):模块化架构(按需安装)
  • 基于Java Springboot宠物救助管理系统