当前位置: 首页 > article >正文

kafka实时返回浏览数据

在安装完kafka(Docker安装kafka_docker 部署kafka-CSDN博客),查看容器是否启动:

docker ps | grep -E 'kafka|zookeeper'

再用python开启服务

from fastapi import FastAPI, Request
from kafka import KafkaProducer
import kafka
import json
import logging
from datetime import datetime

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

# 初始化 FastAPI 应用
app = FastAPI()

# 示例博客文章数据
blog_posts = [
    {"id": 1, "title": "First Post", "content": "This is the first post."},
    {"id": 2, "title": "Second Post", "content": "This is the second post."}
]

def produce_view_event(ip_address, post_id):
    """
    生成博客文章的查看事件。

    参数:
        ip_address (str): 查看者的 IP 地址。
        post_id (int): 被查看的文章 ID。
    """
    logging.info(f"生成查看事件,文章 ID: {post_id},IP 地址: {ip_address}")

    try:
        # 初始化 Kafka 生产者
        producer = KafkaProducer(
            bootstrap_servers='110.40.130.231:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

        # 准备发送到 Kafka 的消息
        message = {
            "ip_address": ip_address,
            "post_id": post_id,
            "event_type": "view"
        }

        logging.info(f"发送消息到 Kafka: {message}")
        future = producer.send('blog_views', value=message)

        try:
            # 等待消息成功发送
            record_metadata = future.get(timeout=10)
            logging.info(
                f"消息发送成功。主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")
        except Exception as e:
            logging.error(f"发送消息失败: {e}")

        # 确保所有消息已发送并关闭生产者
        producer.flush()
        producer.close()

        # 将查看事件打印到控制台
        print_view_event(ip_address, post_id)
    except kafka.errors.NoBrokersAvailable as e:
        logging.error(f"没有可用的 Broker: {e}")

def print_view_event(ip_address, post_id):
    """
    打印博客文章的查看事件。

    参数:
        ip_address (str): 查看者的 IP 地址。
        post_id (int): 被查看的文章 ID。
    """
    event_type = "view"
    created_at = datetime.now().isoformat()
    print(
        f"View Event - IP Address: {ip_address}, Post ID: {post_id}, Event Type: {event_type}, Created At: {created_at}")

@app.get("/posts/{post_id}")
def get_post(post_id: int, request: Request):
    """
    根据 ID 获取博客文章。

    参数:
        post_id (int): 博客文章的 ID。
        request (Request): 进来的请求对象。

    返回:
        dict: 如果找到文章则返回文章,否则返回错误信息。
    """
    logging.info(f"收到请求,文章 ID: {post_id}")
    for post in blog_posts:
        if post["id"] == post_id:
            logging.info(f"找到文章: {post}")
            produce_view_event(request.client.host, post_id)
            return post
    return {"error": "文章未找到"}

if __name__ == "__main__":
    import uvicorn
    import os

    # 获取当前文件名(不带扩展名)供 UVicorn 使用
    app_modeel_name = os.path.basename(__file__).replace(".py", "")
    print(app_modeel_name)

    # 使用 UVicorn 运行 FastAPI 应用
    uvicorn.run(f"{app_modeel_name}:app", host='0.0.0.0', port=1213, reload=True)

访问:http://110.40.130.231:1213/posts/1


http://www.kler.cn/a/377896.html

相关文章:

  • 如何实现圆形头像功能
  • Cline 3.0发布:从AI编程助手到通用智能体平台的进化
  • 财会〔2024〕22号发布,全面提高管理会计数字化、智能化水平,泛微·齐业成来助力
  • Spring Boot 多数据源解决方案:dynamic-datasource-spring-boot-starter 的奥秘
  • linux系统编程(五)
  • leetcode-128.最长连续序列-day14
  • 迷宫求解:探索最优路径的算法与应用
  • Java接入Hive
  • IMX6ULL裸机-汇编_反汇编_机器码
  • win10 更新npm 和 node
  • Redis系列---常见问题
  • Hadoop生态圈框架部署(一)- Linux操作系统安装及配置
  • [CARLA系列--01]CARLA 0.9.15 在Windows下的安装教程(一)
  • 系统架构师如何备考-超有用的备考经验(送博主用到的资料)
  • RHCE——DNS域名解析服务器、selinux、防火墙
  • 字符串-05-字符串合并处理
  • 抗疫物资智能管理:SpringBoot技术探索
  • 两数之和笔记
  • redis v6.0.16 安装 基于Ubuntu 22.04
  • (蓝桥杯C/C++)——STL(上)
  • 使用代理和不使用代理request获取host、scheme、url、ip区别
  • FOYA传媒科技招聘
  • 第五项修炼—系统思考
  • 二分查找算法—C++
  • 【机器学习】18. 反向传播 Backpropagation algorithm, 学习率,动量Momenetum, Xavier,梯度消失
  • 实用篇:Postman历史版本下载