当前位置: 首页 > article >正文

【FastAPI】服务器使用SSE实现客户端之间的广播和点对点功能

在 FastAPI 中实现使用SSE实现客户端之间的广播和点对点功能,可以通过以下步骤实现:

  1. 安装依赖:

    FastAPI 本身就支持 SSE,你只需要安装 fastapiuvicorn 来运行应用程序。

    pip install fastapi uvicorn
    
  2. SSE 服务端代码:

    下面的代码展示了如何用 FastAPI 实现 SSE 服务器。我们将用一个全局字典 clients 来存储每个客户端的事件流,并支持广播和点对点消息发送。

    from fastapi import FastAPI, Request
    from fastapi.responses import StreamingResponse
    from starlette.responses import JSONResponse
    from typing import Dict
    import asyncio
    
    app = FastAPI()
    
    # 存储所有连接的客户端
    clients: Dict[str, asyncio.Queue] = {}
    
    # SSE 事件生成器
    async def event_generator(queue: asyncio.Queue):
        try:
            while True:
                data = await queue.get()
                yield f"data: {data}\n\n"
        except asyncio.CancelledError:
            pass
    
    # SSE 连接端点
    @app.get("/sse/{client_id}")
    async def sse(client_id: str):
        queue = asyncio.Queue()
        clients[client_id] = queue
        return StreamingResponse(event_generator(queue), media_type="text/event-stream")
    
    # 广播消息给所有客户端
    @app.post("/broadcast")
    async def broadcast_message(message: dict):
        for queue in clients.values():
            await queue.put(message['content'])
        return JSONResponse({"status": "Broadcasted"})
    
    # 点对点消息发送
    @app.post("/send/{client_id}")
    async def send_message(client_id: str, message: dict):
        if client_id in clients:
            await clients[client_id].put(message['content'])
            return JSONResponse({"status": f"Message sent to {client_id}"})
        return JSONResponse({"status": "Client not found"}, status_code=404)
    
    # 客户端断开连接处理
    @app.get("/disconnect/{client_id}")
    async def disconnect_client(client_id: str):
        if client_id in clients:
            del clients[client_id]
            return JSONResponse({"status": f"Disconnected client {client_id}"})
        return JSONResponse({"status": "Client not found"}, status_code=404)
    
    

功能说明:

  1. SSE 连接: 每个客户端可以通过 /sse/{client_id} 连接到 SSE 端点,其中 {client_id} 是该客户端的唯一标识符。每个连接都分配了一个独立的事件队列,用于接收消息。

  2. 广播消息: 客户端可以通过 /broadcast POST 请求向所有连接的客户端广播消息。

    请求体示例:

    {
      "content": "This is a broadcast message"
    }
    
  3. 点对点消息发送: 通过 /send/{client_id} POST 请求,可以将消息发送给指定的客户端。

    请求体示例:

    {
      "content": "Private message to specific client"
    }
    
  4. 客户端断开: 通过 /disconnect/{client_id} 可以手动断开特定客户端的连接。

客户端示例:

前端使用 JavaScript 来实现 SSE 客户端:

const clientId = 'client1';  // 替换为你客户端的唯一ID
const eventSource = new EventSource(`/sse/${clientId}`);

eventSource.onmessage = function(event) {
    console.log("New message from server:", event.data);
};

// 断开连接时
eventSource.close();

总结:

  1. event_generator 负责生成 SSE 消息流。
  2. clients 字典用于管理每个连接的客户端,允许广播和点对点消息。
  3. 每个客户端都可以通过唯一 ID 建立连接,发送或接收消息。

这种方式可以轻松扩展以支持更多的业务逻辑,例如认证、分组广播等。


http://www.kler.cn/news/314156.html

相关文章:

  • 给新人的python笔记(一)
  • 深度学习基本概念详解
  • flink on k8s
  • 79篇vs13篇!本周中国学者发文量远超外国学者| NHANES数据库周报(8.28~9.3)
  • 执行matlab后进行RTL功能仿真check
  • 基于开源鸿蒙(OpenHarmony)的【智能家居综合应用】系统
  • 代理模式---静态代理和动态代理
  • JVM内存学习
  • Lodash的特点和功能
  • WGAN算法
  • 信奥初赛解析:1-3-计算机软件系统
  • YOLOv5模型部署教程
  • 小阿轩yx-通过state模块定义主机状态
  • 【计网面试真题】If-Modified-Since和Etag有什么区别
  • WebServer
  • 6、等级保护政策内容
  • Go语言的垃圾回收(GC)机制的迭代和优化历史
  • Vision Based Navigation :针对航天领域的基于视觉导航机器学习应用生成训练数据集
  • Redis的AOF持久化、重写机制、RDB持久化、混合持久化
  • Springboot常见问题(bean找不到)
  • C#为任意组件开发登录功能的记录
  • android设置实现广告倒计时功能
  • [Python数据可视化]Plotly Express: 地图数据可视化的魅力
  • 第十九节:学习WebFlux与前端响应式-非阻塞-流式通讯(自学Spring boot 3.x的第四天)
  • Java操控Redis (面经之 使用Redis)
  • 【HTTP】构造HTTP请求和状态码
  • [译] Go语言的源起,发展和未来
  • Rust语言入门第七篇-控制流
  • Highcharts甘特图基本用法(highcharts-gantt.js)
  • 安装黑群晖系统,并使用NAS公网助手访问教程(好文)