【FastAPI】服务器使用SSE实现客户端之间的广播和点对点功能
在 FastAPI 中实现使用SSE实现客户端之间的广播和点对点功能,可以通过以下步骤实现:
-
安装依赖:
FastAPI 本身就支持 SSE,你只需要安装
fastapi
和uvicorn
来运行应用程序。pip install fastapi uvicorn
-
SSE 服务端代码:
下面的代码展示了如何用 FastAPI 实现 SSE 服务器。我们将用一个全局字典
clients
来存储每个客户端的事件流,并支持广播和点对点消息发送。from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from starlette.responses import JSONResponse from typing import Dict import asyncio app = FastAPI() # 存储所有连接的客户端 clients: Dict[str, asyncio.Queue] = {} # SSE 事件生成器 async def event_generator(queue: asyncio.Queue): try: while True: data = await queue.get() yield f"data: {data}\n\n" except asyncio.CancelledError: pass # SSE 连接端点 @app.get("/sse/{client_id}") async def sse(client_id: str): queue = asyncio.Queue() clients[client_id] = queue return StreamingResponse(event_generator(queue), media_type="text/event-stream") # 广播消息给所有客户端 @app.post("/broadcast") async def broadcast_message(message: dict): for queue in clients.values(): await queue.put(message['content']) return JSONResponse({"status": "Broadcasted"}) # 点对点消息发送 @app.post("/send/{client_id}") async def send_message(client_id: str, message: dict): if client_id in clients: await clients[client_id].put(message['content']) return JSONResponse({"status": f"Message sent to {client_id}"}) return JSONResponse({"status": "Client not found"}, status_code=404) # 客户端断开连接处理 @app.get("/disconnect/{client_id}") async def disconnect_client(client_id: str): if client_id in clients: del clients[client_id] return JSONResponse({"status": f"Disconnected client {client_id}"}) return JSONResponse({"status": "Client not found"}, status_code=404)
功能说明:
-
SSE 连接: 每个客户端可以通过
/sse/{client_id}
连接到 SSE 端点,其中{client_id}
是该客户端的唯一标识符。每个连接都分配了一个独立的事件队列,用于接收消息。 -
广播消息: 客户端可以通过
/broadcast
POST 请求向所有连接的客户端广播消息。请求体示例:
{ "content": "This is a broadcast message" }
-
点对点消息发送: 通过
/send/{client_id}
POST 请求,可以将消息发送给指定的客户端。请求体示例:
{ "content": "Private message to specific client" }
-
客户端断开: 通过
/disconnect/{client_id}
可以手动断开特定客户端的连接。
客户端示例:
前端使用 JavaScript 来实现 SSE 客户端:
const clientId = 'client1'; // 替换为你客户端的唯一ID
const eventSource = new EventSource(`/sse/${clientId}`);
eventSource.onmessage = function(event) {
console.log("New message from server:", event.data);
};
// 断开连接时
eventSource.close();
总结:
event_generator
负责生成 SSE 消息流。clients
字典用于管理每个连接的客户端,允许广播和点对点消息。- 每个客户端都可以通过唯一 ID 建立连接,发送或接收消息。
这种方式可以轻松扩展以支持更多的业务逻辑,例如认证、分组广播等。