Node 使用 SSE 结合redis 推送数据(echarts 图表实时更新)
1、实时通信有哪些实现方式?
特性 | 轮询(Polling) | WebSocket | SSE (Server-Sent Events) |
---|---|---|---|
通信方向 | 单向(客户端 → 服务端) | 双向(客户端 ↔ 服务端) | 单向(服务端 → 客户端) |
连接方式 | 客户端定时发起HTTP请求 | 持久连接(基于TCP协议) | 持久连接(基于HTTP协议) |
实现复杂度 | 简单 | 较复杂(需处理握手、协议转换等) | 简单 |
实时性 | 低(依赖轮询间隔) | 高(支持实时双向通信) | 高(服务端实时推送) |
性能开销 | 高(频繁建立/关闭连接) | 低(单个持久连接) | 低(单个持久连接) |
示例场景 | 定时获取天气、股票数据 | 在线聊天、多人游戏、实时协作 | 新闻推送、通知、实时股票行情 |
- 轮询:开发简单,但是效率太低了,适合对实时性要求不高的场景。在项目中,基本是不推荐使用,这种做法比较 low。
- WebSocket:开发起来较为复杂。性能好,可以双向实时通信,适合需要交互的场景。推荐使用 Socket.IO 包来开发。
- SSE:开发简单。性能好,但只能单向实时通信,适合服务端向客户端主动推送数据的场景。
另外值得一提的是,现在各个 AI 平台的消息推送
,也都是使用SSE
来实现的。
2、SSE 的基础实现
// 设置正确的响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 定时发送数据
const intervalId = setInterval(() => {
const data = {
message: `当前时间是 ${new Date().toLocaleTimeString()}`,
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 2000);
// 处理客户端断开连接
req.on('close', () => {
clearInterval(intervalId); // 清除定时器
res.end(); // 结束响应
});
- 想要用
SSE来推送数据
,顶部
要按照这个格式来设置响应头
,明确指明为event-stream
。这样才能被识别成SSE。 中间
部分,简单的写了一个定时执行
。每隔两秒钟,计算一下当前的时间。- 然后注意了,这里用的是
res.write
,这是实现SSE的关键代码
。使用res.write可以不断的,分多次发送数据,而不需要一次性发送完整的响应
。 - 还有要注意的是,我们这里
发送数据的格式
,按照SSE的规则
,必须是以data:开头,以两个\n结束
。\n是换行的意思。 - 最下面,如果
连接断开了
,就停止定时器
,并结束响应
。
3、前端部分实现
SSE默认不支持在header中传递数据
,那就直接在URL里传token好了
// 定义管理员令牌,用于身份验证,在实际使用时需将 'xxxx' 替换为真实有效的令牌
const token = 'xxxx';
// 创建一个 EventSource 对象,用于建立与服务器的 SSE 连接
// 这里使用模板字符串将 token 拼接到请求的 URL 中,该 URL 指向服务器上处理订单流数据的接口
const eventSource = new EventSource(`http://localhost:3000/admin/charts/stream_order?token=${ token }`);
// 当服务器向客户端发送消息时,会触发 onmessage 事件
eventSource.onmessage = function (event) {
// 使用 try...catch 块来捕获可能出现的异常,例如数据解析失败的情况
try {
// 服务器发送的数据存储在 event.data 中,通常是 JSON 格式的字符串
// 使用 JSON.parse 方法将其解析为 JavaScript 对象
const data = JSON.parse(event.data);
// 将解析后的数据打印到控制台,方便调试和查看
console.log(data);
} catch (error) {
// 如果解析数据过程中出现错误,将错误信息打印到控制台
console.error('解析数据出错:', error);
}
};
// 当 SSE 连接出现错误时,会触发 onerror 事件
eventSource.onerror = function (error) {
// 将连接错误信息打印到控制台,方便调试和排查问题
console.error('SSE 连接出错:', error);
};
4、测试
检查
,选择网络
。然后刷新一下
,就能看到发出的请求
了
注意
:请求的状态是pending
,等待 2 秒钟
后,状态变为200
的时候我们再点进去。这是因为如果点早了
,浏览器还没有将它识别成SSE
。
5、 SSE 小结
- 在
Node
部分,设置好响应头
,并用res.write
不断的发数据出去就好。 前端
部分,用new EventSource
建立连接。并通过onmessage
事件,来接收数据。
6、实践
redis 封装
6.1、后端代码实现
6.1.1、新建 SSE 处理工具类
在根目录
新建streams
文件夹,里面再新建一个sse-handler.js
,用于处理服务器发送事件(Server-Sent Events, SSE)的工具类,代码如下:
const {setKey, getKey} = require('../utils/redis');
// 定义一个名为 SSEHandler 的类,用于处理服务器发送事件(SSE)的连接和数据广播
class SSEHandler {
// 构造函数,在创建 SSEHandler 实例时会自动调用
constructor() {
// 使用 ES6 的 Set 数据结构来存储与浏览器建立 SSE 连接的响应对象(res)
// Set 可以确保存储的元素唯一,避免重复
this.clients = new Set();
}
/**
* 初始化 SSE 数据流,处理新的客户端连接
* @param {Object} res - Express 响应对象,用于向客户端发送数据
* @param {Object} req - Express 请求对象,包含客户端的请求信息
*/
initStream(res, req) {
// 设置响应头,指定响应内容类型为 text/event-stream,这是 SSE 的标准内容类型
res.setHeader('Content-Type', 'text/event-stream');
// 设置缓存控制,禁止浏览器缓存响应内容,确保每次请求都能获取最新数据
res.setHeader('Cache-Control', 'no-cache');
// 设置连接类型为 keep-alive,保持与客户端的长连接
res.setHeader('Connection', 'keep-alive');
// 刷新响应头,将设置的响应头信息发送给客户端
res.flushHeaders();
// 将当前客户端的响应对象添加到 clients 集合中,以便后续广播数据时使用
this.clients.add(res);
// 监听客户端连接关闭事件,当客户端断开连接时触发回调函数
req.on('close', () => {
// 从 clients 集合中删除当前客户端的响应对象
this.clients.delete(res);
// 打印日志,提示客户端已断开连接
console.log('Client disconnected');
});
}
/**
* 向所有连接的客户端广播数据
* @param {Object} data - 要广播的数据对象,会被序列化为 JSON 字符串发送给客户端
*/
async broadcastData(data) {
await setKey('sse_broadcast_data', data);
// 遍历 clients 集合中的每个客户端响应对象
this.clients.forEach((client) => {
// 检查客户端响应是否已经结束,如果未结束则继续发送数据
if (!client.finished) {
// 按照 SSE 的格式,以 "data: " 开头,后面跟上 JSON 序列化后的数据,以两个换行符 "\n\n" 结尾
// 并将其写入客户端响应流,发送给客户端
client.write(`data: ${JSON.stringify(data)}\n\n`);
}
});
}
}
// 将 SSEHandler 类导出,以便其他模块可以引入和使用
module.exports = SSEHandler;
6.1.2、定义和管理统计查询的 SQL 语句·
创建utils/stats-query.js
文件,用于定义和管理统计查询的 SQL 语句:
const statsQueries = {
order: "SELECT DATE_FORMAT(`createdAt`, '%Y-%m') AS `month`, COUNT(*) AS `value` FROM `Orders` GROUP BY `month` ORDER BY `month` ASC",
user: "SELECT DATE_FORMAT(`createdAt`, '%Y-%m') AS `month`, COUNT(*) AS `value` FROM `Users` GROUP BY `month` ORDER BY `month` ASC",
// 可以添加更多类型的统计查询
};
function getStatsQuery(type) {
return statsQueries[type];
}
module.exports = {
getStatsQuery
};
6.1.3、 广播服务的实现
创建utils/broadcast-service.js
文件,实现广播服务:
const {sequelize} = require('../models');
const SSEHandler = require('../streams/sse-handler');
// 直接在 broadcast-service.js 中定义统计查询配置
const {getStatsQuery} = require('./stats-query');
// 存储不同类型的 SSE 处理程序
const sseHandlers = {};
async function broadcastStats(type) {
try {
if (!getStatsQuery(type)) {
console.error(`Invalid stats type: ${type}`);
return;
}
if (!sseHandlers[type]) {
sseHandlers[type] = new SSEHandler();
}
const [results] = await sequelize.query(getStatsQuery(type));
const data = {
months: results.map(item => item.month),
values: results.map(item => item.value)
};
sseHandlers[type].broadcastData(data);
console.log(`${type} stats broadcasted successfully`);
} catch (error) {
console.error(`Error broadcasting ${type} stats:`, error);
}
}
function initSSEStream(type, res, req) {
if (!getStatsQuery(type)) {
console.error(`Invalid stats type: ${type}`);
return;
}
if (!sseHandlers[type]) {
sseHandlers[type] = new SSEHandler();
}
sseHandlers[type].initStream(res, req);
}
module.exports = {
broadcastStats,
initSSEStream
};
6.1.4、新建 charts.js
文件实现 ECharts
路由(仅展示关键代码)
在这一部分,我们将创建一个 charts.js 文件,用于实现与 ECharts 相关的路由功能。该文件主要负责处理不同类型的统计数据请求,并利用 Redis 进行数据缓存,同时支持通过 SSE(Server-Sent Events)进行实时数据推送。
const {getStatsQuery} = require('../../utils/stats-query');
const {initSSEStream} = require('../../utils/broadcast-service');
const {getKey, setKey} = require('../../utils/redis');
// 获取统计数据的通用路由
// 此路由根据请求参数中的 type 来获取相应的统计数据
router.get('/:type', async (req, res) => {
// 从请求参数中获取统计数据的类型
const type = req.params.type;
// 根据类型获取对应的统计查询语句
const query = getStatsQuery(type);
// 如果未找到对应的查询语句,返回错误信息
if (!query) {
return failure(res, new Error('Invalid stats type'));
}
try {
// 首先尝试从 Redis 中获取缓存的数据
let cachedData = await getKey(`stats_data_${type}`);
// 如果 Redis 中存在缓存数据,直接返回该数据
if (cachedData) {
return success(res, 'Stats data fetched successfully', {data: cachedData});
}
// 若 Redis 中没有缓存数据,执行数据库查询
const [results] = await sequelize.query(query);
// 对查询结果进行处理,将月份和对应的值分别提取出来
const data = {
months: results.map(item => item.month),
values: results.map(item => item.value)
};
// 将处理后的数据存储到 Redis 中,以便后续请求可以直接使用缓存数据
await setKey(`stats_data_${type}`, data);
// 返回查询成功的响应,并将数据发送给客户端
success(res, 'Stats data fetched successfully', {data});
} catch (error) {
// 若出现错误,返回错误响应
failure(res, error);
}
});
/**
* SSE 统计不同类型数据
* GET /admin/charts/stream/:type
* 此路由用于处理 SSE 连接,实时推送不同类型的统计数据
*/
router.get('/stream/:type', async (req, res) => {
// 从请求参数中获取统计数据的类型
const type = req.params.type;
// 初始化 SSE 数据流,开始向客户端推送数据
initSSEStream(type, res, req);
});
6.1.5、表单新增数据
时 删除redis缓存
以order
为例
// 定义一个处理 POST 请求的路由,路径为根路径('/')
// 当客户端向该路径发送 POST 请求时,此中间件函数会被调用
// req 表示请求对象,包含客户端发送的请求信息
// res 表示响应对象,用于向客户端发送响应
// next 是 Express 中的中间件函数,用于将控制权传递给下一个中间件
router.post('/', async function (req, res, next) {
try {
// 生成一个唯一的订单号
// 使用 uuidv4 函数生成一个通用唯一识别码(UUID)
// 然后使用 replace 方法将 UUID 中的连字符(-)替换为空字符串,得到一个无连字符的订单号
const outTradeNo = uuidv4().replace(/-/g, '');
// 调用 getMembership 函数,根据请求对象 req 获取会员信息
// 这个函数可能会从数据库、缓存或其他数据源中获取与当前请求相关的会员信息
const membership = await getMembership(req);
// 使用 Sequelize 的 create 方法创建一个新的订单记录
// 传入一个包含订单信息的对象,这些信息将被插入到数据库的 Order 表中
const order = await Order.create({
// 订单号,使用前面生成的唯一订单号
outTradeNo: outTradeNo,
// 用户 ID,从请求对象中获取当前用户的 ID
userId: req.userId,
// 订单主题,使用会员的名称
subject: membership.name,
// 会员时长(月),从会员信息中获取会员的持续月数
membershipMonths: membership.durationMonths,
// 订单总金额,使用会员的价格
totalAmount: membership.price,
// 订单状态,初始状态设为 0,通常表示待支付
status: 0,
});
// 删除 Redis 中存储的订单统计数据缓存
// 当有新订单创建时,之前的统计数据可能不再准确,所以需要删除缓存
// 后续请求统计数据时会重新从数据库获取最新数据
await delKey('stats_data_order');
// 调用广播服务,将订单统计数据的更新广播出去
// 这可能会触发前端页面或其他相关服务更新订单统计信息
await broadcastStats('order');
// 调用 success 函数,向客户端发送成功响应
// 第一个参数是响应对象 res,用于发送响应
// 第二个参数是成功消息,告知客户端订单创建成功
// 第三个参数是包含订单信息的对象,客户端可以使用这些信息进行后续处理
success(res, '订单创建成功。', { order });
} catch (error) {
// 如果在订单创建过程中出现错误,调用 failure 函数向客户端发送错误响应
// 第一个参数是响应对象 res,用于发送响应
// 第二个参数是捕获到的错误对象,客户端可以根据错误信息进行相应处理
failure(res, error);
}
});
6.2、前端代码实现
6.2.1、前端charts
封装
在前端开发中,为了高效管理 ECharts 图表,我们封装了 ChartManager 类,它能处理图表的初始化、数据获取、SSE 连接以及错误处理等操作。以下是详细的代码及说明:
// 配置对象,可根据实际情况灵活修改,用于存储与图表管理相关的配置信息
const config = {
// API 请求的基础 URL,用于获取图表数据和建立 SSE 连接
API_BASE_URL: 'http://localhost:3000',
// 最大重试次数,当 SSE 连接失败时会进行重试,达到该次数后停止重试
MAX_RETRIES: 5,
// 重试延迟时间(毫秒),每次重试之间的间隔时长
RETRY_DELAY: 3000
};
/**
* ChartManager 类,负责管理 ECharts 图表的初始化、数据获取、SSE 连接以及错误处理等功能
*/
class ChartManager {
/**
* 构造函数,用于初始化图表管理器的基本属性
* @param {string} chartId - 图表容器的 ID,用于查找对应的 DOM 元素
* @param {string} type - 图表类型,例如 'order' 或 'user',决定获取数据的接口路径
* @param {string} token - 用于身份验证的令牌,在请求数据时使用
* @param {Object} [option={}] - 可选的 ECharts 配置选项,用于自定义图表样式
*/
constructor(chartId, type, token, option = {}) {
this.chartId = chartId;
this.type = type;
this.token = token;
this.chart = null;
this.initialDataFetched = false;
this.sseSource = null;
this.retryCount = 0;
this.maxRetries = config.MAX_RETRIES;
this.retryDelay = config.RETRY_DELAY;
this.option = {
title: {
text: `月度${type === 'order' ? '订单' : '用户'}统计`,
textStyle: { color: '#333' }
},
tooltip: {
trigger: 'axis',
axisPointer: { type: 'shadow' }
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
xAxis: {
type: 'category',
data: [],
axisTick: { alignWithLabel: true }
},
yAxis: {
type: 'value'
},
series: [{
name: '数量',
type: 'bar',
barWidth: '60%',
data: []
}],
...option
};
}
/**
* 初始化图表,包含创建 ECharts 实例、获取初始数据以及建立 SSE 连接等操作
*/
async init() {
try {
const chartDom = document.getElementById(this.chartId);
if (!chartDom) {
throw new Error(`未找到指定 ID 的图表容器:${this.chartId}`);
}
this.chart = echarts.init(chartDom);
this.chart.setOption(this.option);
await this.fetchInitialData();
this.connectSSE();
} catch (error) {
console.error(`图表 ${this.chartId} 初始化失败:`, error);
this.showError('图表初始化失败,请尝试刷新页面重试', error.message);
}
}
/**
* 异步获取初始数据
*/
async fetchInitialData() {
try {
const url = `${config.API_BASE_URL}/admin/charts/${this.type}?token=${this.token}`;
const response = await this.fetchData(url);
const { data } = response;
if (data.data.months.length === 0) {
this.showError('当前暂无数据,10 秒后将尝试重新获取');
setTimeout(() => this.fetchInitialData(), 10000);
return;
}
if (data.data.months.length!== data.data.values.length) {
this.showError('数据格式存在错误:月份和数量数组长度不匹配');
return;
}
this.option.xAxis.data = data.data.months;
this.option.series[0].data = data.data.values;
this.initialDataFetched = true;
this.chart.setOption(this.option);
} catch (error) {
this.showError('数据加载失败,5 秒后将尝试重新加载', error.message);
setTimeout(() => this.fetchInitialData(), 5000);
}
}
/**
* 建立 SSE 连接,实时监听服务器发送的数据
*/
connectSSE() {
if (this.sseSource &&!this.sseSource.closed) {
this.sseSource.close();
}
const url = `${config.API_BASE_URL}/admin/charts/stream/${this.type}?token=${this.token}`;
this.sseSource = new EventSource(url);
this.sseSource.onmessage = (event) => {
try {
const responseData = JSON.parse(event.data);
const data = responseData.data || responseData;
if (!data ||!Array.isArray(data.months) ||!Array.isArray(data.values)) {
this.showError('SSE 返回的数据格式有误,请检查后端接口');
return;
}
if (data.months.length === 0) {
this.showError('暂无数据,请创建订单后再进行查看');
return;
}
if (data.months.length!== data.values.length) {
this.showError('SSE 数据格式错误:月份和数量数组长度不匹配');
return;
}
this.option.xAxis.data = data.months;
this.option.series[0].data = data.values;
this.chart.setOption(this.option);
} catch (parseError) {
console.error('解析 SSE 数据时出现错误:', parseError);
this.showError('实时数据解析失败,请检查网络连接或尝试刷新页面', parseError.message);
}
};
this.sseSource.onerror = (error) => {
console.error('SSE 连接出现错误:', error);
this.showError('实时数据连接中断,正在尝试重新连接...', error.message);
this.handleSseError();
};
}
/**
* 处理 SSE 连接错误,执行重试操作
*/
handleSseError() {
this.retryCount++;
if (this.retryCount <= this.maxRetries) {
setTimeout(() => this.connectSSE(), this.retryDelay);
} else {
this.showError('重连失败,请手动刷新页面');
this.retryCount = 0;
}
}
/**
* 发起网络请求获取数据
* @param {string} url - 请求的 URL
* @returns {Promise<Object>} - 返回解析后的 JSON 数据
*/
async fetchData(url) {
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP 请求失败,状态码:${response.status}`);
}
const contentType = response.headers.get('content-type');
if (!contentType ||!contentType.includes('application/json')) {
throw new Error('响应数据并非 JSON 格式');
}
return await response.json();
} catch (error) {
this.showError('网络请求出现问题', error.message);
throw error;
}
}
/**
* 显示错误信息并更新图表标题
* @param {string} message - 错误消息内容
* @param {string} [errorCode=null] - 可选的错误代码
*/
showError(message, errorCode = null) {
const errorDiv = document.createElement('div');
errorDiv.className = 'error-message';
errorDiv.innerHTML = `
${message}
${errorCode? `<small>错误码: ${errorCode}</small>` : ''}
<button onclick="this.parentElement.remove()">关闭</button>
`;
document.body.prepend(errorDiv);
this.option.series[0].data = [];
this.chart.setOption(this.option);
}
/**
* 销毁图表和 SSE 连接,释放相关资源
*/
destroy() {
if (this.chart) {
this.chart.dispose();
this.chart = null;
}
if (this.sseSource &&!this.sseSource.closed) {
this.sseSource.close();
}
}
}
// 将 ChartManager 类暴露到全局作用域,方便在其他地方使用
window.ChartManager = ChartManager;
6.2.1、前端demo
实现
面是一个前端示例,展示了如何使用 ChartManager 类创建实时统计图表:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<!-- 让页面在移动设备上能正确显示 -->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>实时统计图表</title>
<!-- 引入 ECharts 库,用于创建各种类型的图表 -->
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
<!-- 引入自定义的图表管理脚本,包含 ChartManager 类等逻辑 -->
<script src="charts.js"></script>
<style>
body {
/* 设置页面整体字体为 Arial 无衬线字体 */
font-family: Arial, sans-serif;
/* 使用 Flexbox 布局,让内容水平和垂直居中 */
display: flex;
justify-content: center;
align-items: center;
/* 让内容垂直排列 */
flex-direction: column;
/* 页面四周添加 20px 的外边距 */
margin: 20px;
}
.chart-container {
/* 设置图表容器的宽度为 800px */
width: 800px;
/* 设置图表容器的高度为 500px */
height: 500px;
/* 图表容器四周添加 20px 的外边距 */
margin: 20px;
/* 图表容器添加 1px 宽的浅灰色边框 */
border: 1px solid #eee;
/* 图表容器四个角设置 4px 的圆角 */
border-radius: 4px;
}
.error-message {
/* 错误消息文本颜色设置为红色 */
color: red;
/* 错误消息四周添加 20px 的外边距 */
margin: 20px;
/* 错误消息内容四周添加 10px 的内边距 */
padding: 10px;
/* 错误消息背景颜色设置为浅红色 */
background-color: #ffebee;
/* 错误消息框四个角设置 4px 的圆角 */
border-radius: 4px;
}
</style>
</head>
<body>
<!-- 订单统计图表的容器,后续 ECharts 图表会渲染到这个容器中 -->
<div class="chart-container" id="orderChart"></div>
<!-- 用户统计图表的容器,后续 ECharts 图表会渲染到这个容器中 -->
<div class="chart-container" id="userChart"></div>
<script>
// 当文档的 DOM 内容加载完成后执行以下逻辑
document.addEventListener('DOMContentLoaded', async () => {
try {
// 这里的 token 用于身份验证,实际使用时需要替换为从后端获取的真实 token
const token = '';
// 自定义的 ECharts 配置选项,可用于覆盖默认配置
const customOption = {
// 修改图表标题的文本颜色为蓝色
title: {
textStyle: { color: 'blue' }
}
};
// 创建订单统计图表的管理实例,传入容器 ID、图表类型、token 和自定义配置
const orderChart = new ChartManager('orderChart', 'order', token, customOption);
// 创建用户统计图表的管理实例,传入容器 ID、图表类型、token 和自定义配置
const userChart = new ChartManager('userChart', 'user', token, customOption);
// 使用 Promise.all 并行初始化订单图表和用户图表
await Promise.all([orderChart.init(), userChart.init()]);
// 监听窗口的 beforeunload 事件,当用户关闭或刷新页面时执行以下逻辑
window.addEventListener('beforeunload', () => {
// 销毁订单图表实例,释放相关资源
orderChart.destroy();
// 销毁用户图表实例,释放相关资源
userChart.destroy();
});
} catch (error) {
// 如果在初始化过程中出现错误,在控制台输出错误信息
console.error('图表初始化失败:', error);
// 调用 showGlobalError 函数显示全局错误消息
showGlobalError('图表初始化失败,请检查浏览器控制台');
}
});
/**
* 显示全局错误消息的函数
* @param {string} message - 要显示的错误消息内容
*/
function showGlobalError(message) {
// 创建一个新的 div 元素用于显示错误消息
const errorDiv = document.createElement('div');
// 为错误消息 div 元素添加 error-message 类名,以便应用相应的样式
errorDiv.className = 'error-message';
// 设置错误消息 div 元素的文本内容为传入的消息
errorDiv.textContent = message;
// 将错误消息 div 元素添加到页面 body 元素的最前面
document.body.prepend(errorDiv);
}
</script>
</body>
</html>