当前位置: 首页 > article >正文

【FastAPI】实现服务器向客户端发送SSE(Server-Sent Events)广播

在FastAPI中实现服务器向客户端发送SSE(Server-Sent Events)广播,可以通过以下步骤实现。SSE是一种服务器推送技术,允许服务器发送实时数据到客户端,通常用于创建实时更新的应用程序。

1. 安装必要的依赖

首先,确保你已经安装了FastAPI和Uvicorn:

pip install fastapi uvicorn

2. FastAPI服务器实现SSE广播

FastAPI使用StreamingResponse可以实现SSE。以下是一个实现SSE广播的简单例子,服务器会定期向所有连接的客户端广播消息。

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from typing import List
import asyncio

app = FastAPI()

# 用于保存所有的客户端连接
clients: List[asyncio.Queue] = []

# SSE生成器,用于向客户端发送事件流
async def event_generator():
    queue = asyncio.Queue()  # 每个客户端拥有自己的消息队列
    clients.append(queue)
    try:
        while True:
            data = await queue.get()  # 等待新消息
            yield f"data: {data}\n\n"
    except asyncio.CancelledError:
        clients.remove(queue)  # 客户端断开连接时移除其队列
        raise

# SSE endpoint,用于客户端连接
@app.get("/sse")
async def sse(request: Request):
    # 返回SSE流
    return StreamingResponse(event_generator(), media_type="text/event-stream")

# 广播消息到所有客户端
async def broadcast_message(message: str):
    for queue in clients:
        await queue.put(message)  # 将消息放入所有客户端的队列中

# 定期发送广播消息
@app.on_event("startup")
async def startup_event():
    async def send_messages():
        count = 0
        while True:
            await asyncio.sleep(5)  # 每5秒发送一次消息
            await broadcast_message(f"Server message: {count}")
            count += 1

    asyncio.create_task(send_messages())

3. 客户端实现

客户端可以使用EventSource对象来接收SSE事件流。以下是一个简单的HTML客户端代码:

<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
</head>
<body>
    <h1>Server-Sent Events</h1>
    <div id="messages"></div>

    <script>
        const eventSource = new EventSource("/sse");

        eventSource.onmessage = function(event) {
            const newMessage = document.createElement("div");
            newMessage.textContent = event.data;
            document.getElementById("messages").appendChild(newMessage);
        };

        eventSource.onerror = function() {
            console.log("Error occurred or connection closed.");
        };
    </script>
</body>
</html>

4. 运行FastAPI服务器

使用Uvicorn运行服务器:

uvicorn main:app --reload

这样,服务器会每5秒向所有连接的客户端广播消息。客户端只需要访问/sse即可建立与服务器的SSE连接,并实时接收服务器推送的消息。

关键点说明:

  • SSE(Server-Sent Events):服务器通过text/event-stream向客户端推送数据,客户端通过EventSource接收。
  • 广播实现:将消息放入所有客户端的队列中,确保每个连接的客户端都能接收到消息。
  • StreamingResponse:用于持续向客户端发送数据流。

这样,你就可以在FastAPI中实现一个SSE广播功能了。


http://www.kler.cn/a/305090.html

相关文章:

  • 【AI技术对电商的影响】
  • 985研一学习日记 - 2024.11.12
  • 大语言模型在序列推荐中的应用
  • 统信UOS开发环境支持Electron
  • [安洵杯 2019]easy_web 详细题解
  • 深入浅出rust内存对齐
  • [C#][IIS]framework4.0注册到IIS
  • 密码学---常见的其他密码
  • 第十一周:机器学习笔记
  • python 实现euclidean distance欧式距离算法
  • 分享JavaScript中直接调用CSS中的类名
  • 56 - I. 数组中数字出现的次数
  • CAN总线-STM32上CAN外设
  • ansible_find模块
  • 计算机四级数据库原理考试大纲.md
  • 37. MyBatis-Plus是什么?它与MyBatis的主要区别是什么?
  • 音视频开发常见的开源项目汇总
  • 新挂载的磁盘不能创建文件
  • 数据清洗-缺失值填充-随机森林填充
  • 【Django】Django Class-Based Views (CBV) 与 DRF APIView 的区别解析
  • Python基础语法(1)上
  • 基于微信小程序的图书馆预约占座系统
  • 【vscode】 快速生成react组件
  • uniapp小程序,使用腾讯地图获取定位
  • 监听html元素是否被删除,删除之后重新生成被删除的元素
  • PHP悦读随行一键借阅图书小程序