kafka实时返回浏览数据
在安装完kafka(Docker安装kafka_docker 部署kafka-CSDN博客),查看容器是否启动:
docker ps | grep -E 'kafka|zookeeper'
再用python开启服务
from fastapi import FastAPI, Request
from kafka import KafkaProducer
import kafka
import json
import logging
from datetime import datetime
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
# 初始化 FastAPI 应用
app = FastAPI()
# 示例博客文章数据
blog_posts = [
{"id": 1, "title": "First Post", "content": "This is the first post."},
{"id": 2, "title": "Second Post", "content": "This is the second post."}
]
def produce_view_event(ip_address, post_id):
"""
生成博客文章的查看事件。
参数:
ip_address (str): 查看者的 IP 地址。
post_id (int): 被查看的文章 ID。
"""
logging.info(f"生成查看事件,文章 ID: {post_id},IP 地址: {ip_address}")
try:
# 初始化 Kafka 生产者
producer = KafkaProducer(
bootstrap_servers='110.40.130.231:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 准备发送到 Kafka 的消息
message = {
"ip_address": ip_address,
"post_id": post_id,
"event_type": "view"
}
logging.info(f"发送消息到 Kafka: {message}")
future = producer.send('blog_views', value=message)
try:
# 等待消息成功发送
record_metadata = future.get(timeout=10)
logging.info(
f"消息发送成功。主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")
except Exception as e:
logging.error(f"发送消息失败: {e}")
# 确保所有消息已发送并关闭生产者
producer.flush()
producer.close()
# 将查看事件打印到控制台
print_view_event(ip_address, post_id)
except kafka.errors.NoBrokersAvailable as e:
logging.error(f"没有可用的 Broker: {e}")
def print_view_event(ip_address, post_id):
"""
打印博客文章的查看事件。
参数:
ip_address (str): 查看者的 IP 地址。
post_id (int): 被查看的文章 ID。
"""
event_type = "view"
created_at = datetime.now().isoformat()
print(
f"View Event - IP Address: {ip_address}, Post ID: {post_id}, Event Type: {event_type}, Created At: {created_at}")
@app.get("/posts/{post_id}")
def get_post(post_id: int, request: Request):
"""
根据 ID 获取博客文章。
参数:
post_id (int): 博客文章的 ID。
request (Request): 进来的请求对象。
返回:
dict: 如果找到文章则返回文章,否则返回错误信息。
"""
logging.info(f"收到请求,文章 ID: {post_id}")
for post in blog_posts:
if post["id"] == post_id:
logging.info(f"找到文章: {post}")
produce_view_event(request.client.host, post_id)
return post
return {"error": "文章未找到"}
if __name__ == "__main__":
import uvicorn
import os
# 获取当前文件名(不带扩展名)供 UVicorn 使用
app_modeel_name = os.path.basename(__file__).replace(".py", "")
print(app_modeel_name)
# 使用 UVicorn 运行 FastAPI 应用
uvicorn.run(f"{app_modeel_name}:app", host='0.0.0.0', port=1213, reload=True)
访问:http://110.40.130.231:1213/posts/1