使用python操作kafka
第一步:安装kafka的模块
pip install kafka-python
第二步:编写代码
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import time
class StationLog:
def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):
self.station_id = station_id
self.call_out = call_out
self.call_in = call_in
self.call_status = call_status
self.timestamp = timestamp
self.call_duration = call_duration
def to_string(self):
return json.dumps(self.__dict__)
def main():
# 设置连接kafka集群的ip和端口
producer = KafkaProducer(bootstrap_servers='bigdata01:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]
while True:
call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)
call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)
call_status = random.choice(arr)
call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0
# 随机产生一条基站日志数据
station_log = StationLog(
"station_" + str(random.randint(0, 9)),
call_out,
call_in,
call_status,
int(time.time() * 1000), # 当前时间戳
call_duration
)
print(station_log.to_string())
time.sleep(0.1 + random.randint(0, 99) / 100)
try:
# 发送数据到Kafka
producer.send('topicA', station_log.to_string())
except KafkaError as e:
print(f"Failed to send message: {e}")
# 确保所有异步消息都被发送
producer.flush()
if __name__ == "__main__":
main()
以上案例是通过python操作kafka,将一些模拟数据发送到kafka中。