当前位置: 首页 > article >正文

前后端分离项目实现SSE

SSE介绍

在日常web开发中经常会遇到查看数据最新状态的业务场景,例如查看任务状态与日志内容等。比较场景的解决方案是轮循和SSE。

Server-Sent Events (SSE) 是一种允许服务器通过单向通道向客户端推送更新的技术。它基于HTTP协议,客户端使用一个标准的HTTP请求来连接到服务器,并保持这个连接打开,服务器可以在这个连接上持续地发送数据。

SSE的工作原理

  1. 客户端通过HTTP请求连接到服务器,并保持连接不断开。
  2. 服务器推送text/event-stream类型的数据给客户端。
  3. 客户端监听服务器发送的消息,并进行处理。

SSE对比WebSocket

通信方式

  • SSE 是单向的,服务器向客户端推送消息,客户端不能主动向服务器发送数据。
  • WebSocket 是双向的,允许客户端和服务器之间的双向通信。

协议

  • SSE基于HTTP协议,使用HTTP/1.1HTTP/2,更适合需要基于现有HTTP/HTTPS基础设施的应用。
  • WebSocket 是独立协议,虽然也从HTTP握手开始,但连接升级为WebSocket协议后不再使用HTTP,适合需要低延迟、双向通信的应用。

重连与状态管理

  • SSE内置自动重连机制,如果连接断开,客户端会自动尝试重新连接。
  • WebSocket需要额外的代码来处理断开后的重连机制。

SSE对比HTTP轮询

通信效率

  • HTTP轮询客户端周期性地发送请求以检查服务器是否有新数据,这种方法在没有新数据时会产生大量无效请求,浪费带宽和资源。
  • SSE可以保持一个持久连接,只在有新数据时推送消息,效率更高。

实时性

  • HTTP轮询由于间隔请求,通常会有较大的延迟。
  • SSE提供准实时的推送服务,延迟较小。

SSE的优势

  • 简单实现: SSE是基于HTTP的标准,使用方便,客户端只需通过标准的JavaScript API就可以处理消息推送。
  • 自动重连: SSE内置自动重连机制,开发者不需要额外编写代码来处理连接丢失问题。
  • 消息订阅: SSE支持事件流的多类型消息订阅,客户端可以根据不同的事件类型处理不同的数据。
  • 高兼容性: SSE使用HTTP协议,兼容性好,适用于各种网络环境。

SSE的局限性

  • 单向通信: 只能从服务器推送消息到客户端,不能实现双向通信。如果需要双向通信,WebSocket更为合适。
  • 浏览器兼容性: 虽然现代浏览器都支持SSE,但某些旧版本浏览器可能需要使用Polyfill等兼容解决方案。
  • 消息丢失: 如果连接断开且未能自动重连,有可能会丢失未接收到的消息。

适用场景

SSE非常适合需要从服务器向客户端推送实时更新的应用场景,例如:

  • 实时数据更新(例如新闻、股票行情)
  • 服务器事件通知
  • 轻量级的实时通信应用

在需要简单、可靠的服务器推送方案且无需双向通信的场景下,SSE是一个不错的选择。如果需要更复杂的通信机制,WebSocket可能更为合适。

Django后端SSE

创建sse应用

首先创建一个新的APP来实现SSE的功能。

python manage.py startapp sse

在settings.py的INSTALLED_APPS中添加新的APP。

INSTALLED_APPS = [
    ...,
    "sse",
]

创建路由

接下来修改根路由,新增sse前缀的子路由配置,并在APP目录中,新建一个urls.py文件来处理指向这个应用的请求。

  • DRF/urls.py
from django.contrib import admin
from django.urls import path, include, re_path
from django.views.static import serve
from api import views
from django.conf import settings
urlpatterns = [
    ……
    # SSE
    path('v1/sse/', include('sse.urls', namespace='sse'))
]
  • sse/urls.py
from rest_framework import routers
from sse import views
from django.urls import path

app_name = "sse"
urlpatterns = [
    # SSE
    path('demo/', views.DemoAPIView.as_view())
]
router = routers.DefaultRouter()
urlpatterns += router.urls

创建视图函数

创建APIView视图,模拟后端实时触发消息通知。需要注意的是后端发送符合SSE协议的消息必须满足以下要求:

  1. 消息格式正确。SSE 要求每个消息必须以 data: 开头,且后面有一个换行符分隔。例如:
data: This is a message

data: {"message": "This is a message"}
  1. 响应内容类型设置:服务器响应的 Content-Type 必须设置为 text/event-stream,否则浏览器和客户端会将其视为普通的 HTTP 响应。
  2. 后端跨域配置:如果你的请求是跨域的,确保后端 CORS 配置允许 EventSource 的跨域请求,具体可参考文档:跨域访问-崔亮的博客 (cuiliangblog.cn)

通常情况下我们都会用异步实现SSE,所以生成器event_stream()也得是异步的。如果我们需要处理长时间的任务,那么可以写一个异步函数并通过yield返回状态。

  • sse/views.py
import asyncio
import random
import time
from django.http import StreamingHttpResponse
from loguru import logger
from rest_framework.views import APIView


class DemoAPIView(APIView):
    """
    SSE示例数据
    """

    @staticmethod
    def get(request):
        # 定义一个生成器,持续输出数据流
        async def event_stream():
            while True:
                value = random.randint(0, 5)
                await asyncio.sleep(value)  # 模拟随机延时
                msg = 'data: {{"message": "now time:{0} value:{1}"}}\n\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'), value)
                yield msg
        # 返回 StreamingHttpResponse,Content-Type 设置为 text/event-stream
        response = StreamingHttpResponse(event_stream(), content_type='text/event-stream',
                                         headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'})
        return response

启动服务

为了让项目异步运行,我们需要使用ASGI来启动项目,ASGI配置可参考文档:Django新特性汇总-崔亮的博客 (cuiliangblog.cn)。

uvicorn DRF.asgi:application --reload

访问验证

接下来使用浏览器访问api接口地址,浏览器会持续不断的接收响应类型为text/event-stream的最新实时数据并打印。

Nginx中间件SSE

为了保证SSE实时推送,需要对Nginx配置确保禁用缓冲区并确保Nginx不会中断SSE流,确保Nginx可以正确处理SSE连接。

基础反向代理配置

首先配置一个基础的Nginx反向代理,用于反向代理Django应用。

server {
    listen 80;
    server_name ~^.*$;

    location / {
        proxy_pass http://drf:8000;
        proxy_set_header Host $host;
  	    proxy_set_header X-Real-IP $remote_addr;
  	    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

为SSE配置特定的location

为了让Nginx正确处理SSE,需要禁用Nginx的响应缓冲区,并启用HTTP/1.1协议支持(SSE依赖此协议的持续连接功能)。

server {
    listen 80;
    server_name ~^.*$;

    location /v1/sse/ {
        # 代理到Django应用
        proxy_pass http://drf:8000;
        # 禁用响应缓冲,确保数据流立即发送到客户端
        proxy_buffering off;
        # 允许Nginx处理的最大响应头的大小(防止头部被缓存)
        proxy_cache_bypass $http_upgrade;
        # HTTP/1.1 协议支持,防止默认使用HTTP/1.0
        proxy_http_version 1.1;
        # 防止Nginx在长连接时设置 "Connection: close"
        proxy_set_header Connection '';
        # 设置请求头
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location / {
        # 其他反向代理配置
        ……
    }
}

通过这个配置,Nginx可以正确处理和代理Django Rest Framework中的SSE流,并且不会因为默认的缓冲机制导致SSE数据延迟。这样,前端Vue.js应用就可以实时接收来自后端的SSE推送消息。

VUE前端SSE

前端实现SSE可以使用浏览器的原生API——EventSource来实现。但是EventSource并不支持直接设置自定义的 HTTP 请求头,如 Accept 或 Authorization。如果使用需要自定义请求头,推荐使用 @microsoft/fetch-event-source。

这个库基于 Fetch API 实现,提供了比原生 EventSource 更灵活的控制,可以自定义 headers 和其他配置。并内置了自动重连机制,可以轻松地实现高级的错误处理逻辑。如果连接断开,浏览器会自动重试。

仓库地址:Azure/fetch-event-source: A better API for making Event Source requests, with all the features of fetch() (github.com)

封装请求函数

  • src/api/home.js
// 获取SSE数据
export function getSSE()  {
  return import.meta.env.VITE_APP_BASE_URL + '/sse/demo/'
}

配置路由

  • src/router/index.js
import {createRouter, createWebHistory} from 'vue-router';

const router = createRouter({
    history: createWebHistory(),  //h5模式createWebHistory
    routes: [
       ……
        {
            path: '/test',
            component: () => import('@/views/Test.vue'),
            meta: {
                title: '测试',
            }
        }
    ]
})
// 路由导航守卫
router.beforeEach((to, from, next) => {
    document.title = to.meta.title
    next()
})
export default router;

创建测试页

在 @microsoft/fetch-event-source 中,主要使用 fetchEventSource 函数来创建一个新的 EventSource 连接。这个函数接受一个 URL 参数,以及一个配置对象,其中可以包含一些选项,如请求方法、请求头、请求体等。当服务器向客户端推送事件时,可以通过 onmessage 回调函数来处理这些事件。此外,还可以提供 onerror 和 onclose 回调函数来处理连接错误和关闭事件。

  • src/views/Test.vue
<template>
  <h1>这是测试页</h1>
  <p v-for="item in data">
    {{ item.message }}
  </p>
</template>

<script setup>
import {ref, onMounted, onUnmounted} from "vue";
import {fetchEventSource} from '@microsoft/fetch-event-source';
import {getSSE} from "@/api/home";

const data = ref([])
const connectSSE = async () => {
  const url = getSSE()
  console.log(url)
  await fetchEventSource(url, {
    method: 'GET',
    headers: {
      'Accept': '*/*'
    },
    onmessage: async (event) => {
      console.log(event)
      console.log(JSON.parse(event.data))
      data.value.push(JSON.parse(event.data))
    },
    onerror(err) {
      console.error('Error:', err);
      if (err.status === 500) {
        // 服务器错误时重新连接
        setTimeout(() => connectSSE(), 5000);
      }
    },
    onopen(response) {
      if (response.ok) {
        console.log('Connection start');
      }
    },
    onclose() {
      console.log('Connection close');
    }
  })
}
onMounted(() => {
  connectSSE();
});
onUnmounted(() => {

});


</script>

<style scoped lang="scss">

</style>

访问验证

接下来访问vue测试页面路由,查看控制台信息验证,可以持续打印最新的数据流内容。

查看更多

微信公众号

微信公众号同步更新,欢迎关注微信公众号《崔亮的博客》第一时间获取最近文章。

博客网站

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn


http://www.kler.cn/a/302960.html

相关文章:

  • uniapp 微信小程序 editor 富文本编辑器
  • 使用 Helm 安装 Redis 集群
  • 《计算机网络》课后探研题书面报告_网际校验和算法
  • WOA-CNN-GRU-Attention、CNN-GRU-Attention、WOA-CNN-GRU、CNN-GRU四模型对比多变量时序预测
  • unity学习18:unity里的 Debug.Log相关
  • 51c大模型~合集106
  • Redis Sentinel(哨兵)详解
  • 从JVM角度看对象创建过程和内存布局
  • 基于微信小程序+Java+SSM+Vue+MySQL的宿舍管理系统
  • F1C100S/F1C200S的资料来源说明
  • 快速切换淘宝最新镜像源npm
  • js 时间戳转日期格式
  • 【STM32笔记】STM32CubeIDE介绍
  • 漫谈设计模式 [18]:策略模式
  • ISO 21434与网络安全管理系统(CSMS)的协同作用
  • Java 入门指南:Java 并发编程 —— 同步工具类 Exchanger(交换器)
  • 学生请假管理系统
  • pytest钩子函数
  • Ubuntu22.04安装nginx
  • SpringBoot项目用Aspose-Words将Word转换为PDF文件正常显示中文的正确姿势
  • RP2040 C SDK clocks时钟源配置使用
  • 【Kubernetes】K8s 的鉴权管理(二):基于属性 / 节点 / Webhook 的访问控制
  • 《PhysDiff: Physics-Guided Human Motion Diffusion Model》ICCV2023
  • Rust使用Actix-web和SeaORM库开发WebAPI通过Swagger UI查看接口文档
  • 若依框架使用MyBatis-Plus中的baseMapper的方法报错Invalid bound statement (not found):
  • 中电金信:金融级数字底座“源启”:打造新型数字基础设施 筑牢千行百业数字化转型发展基石