SSE (Server-Sent Events) 服务器实时推送详解
Server-Sent Events
- 一、什么是 SSE ?
- 二、SSE 的工作原理
- 三、SSE 的基本配置
- 1.HTTP 请求和响应头设置
- 2.SSE 字段介绍
- 3.SSE 事件数据流示例
- 四、SseEmitter 的基本配置
- 1.SseEmitter 介绍及用法
- 2.使用 SseEmitter 示例1
- 1)编写核心 SSE Client
- 2)编写 Controller
- 3)前端接收与处理
- 3.使用 SseEmitter 示例2
- 1)后端实现SSE
- 2)前端接入SSE
- 五、注意事项
- 1.断开连接时
- 2.nginx配置
- 3.广播推送
- 3.安全问题
- 1)防止 XSS 攻击
- 2)验证连接请求
- 3)限制连接数量
- 4)限制连接数量
- 5)监控和日志记录
- 6)实施访问控制
- 4.服务端资源消耗
- 1)连接开销
- 2)并发连接
- 3)状态管理
- 4)内存泄漏防护
- 5)日志和监控
- 6)优化策略
一、什么是 SSE ?
Server-sent Events
(SSE)是一种轻量级的服务器主动向客户端单向推送实时数据的技术。
与 WebSocket 不同的是,服务器发送事件是单向的。数据消息只能从服务端到发送到客户端(如用户的浏览器)。这使其成为不需要从客户端往服务器发送消息的情况下的最佳选择。二者的主要区别:
SSE | WebSocket | |
---|---|---|
通信 | 单向通信 | 双向通信 |
协议 | HTTP | WebSocket |
自动重连 | 支持 | 不支持,需要客户端自行支持 |
数据格式 | 文本格式,如果需要二进制数据,需要自行编码 | 默认二进制数据,支持文本格式 |
浏览器支持 | 大部分支持,早期 Edge 浏览器、IE浏览器不支持 | 主流浏览器(包括移动端)的支持较好 |
SSE 常见推送场景有:微信消息通知栏、新闻推送、外卖状态 等等,我们自身的推送场景有:下载、连线请求、直播提醒 …
二、SSE 的工作原理
sse 的工作原理基于标准的 HTTP 协议,客户端通过发送一个特殊的 HTTP GET 请求到服务器,请求中包含 Accept: text/event-stream 头,表明客户端期望接收 SSE 数据流
。
服务器响应后保持连接打开,并可以持续向客户端推送数据。数据流由一系列事件组成,每个事件都包含事件类型、数据内容和事件 ID 等信息,客户端可以使用 JavaScript 中的 EventSource 接口来监听服务器发送的事件,并进行相应的处理。
三、SSE 的基本配置
1.HTTP 请求和响应头设置
在 sse 中,客户端首先向服务器发送一个 HTTP GET 请求,表明客户端准备接收 SSE 数据流,在服务器响应的时候,需要设置特定的响应头来告知客户端这是一个 SSE 流:
- Content-Type : text/event-stream:告诉浏览器这个响应是SSE流,浏览器应该以这种方式处理接收到的数据。
- Character-Encoding : UTF-8:设置编码格式。
- Cache-Control : no-cache:指示浏览器不应该缓存此响应。对于SSE来说,这是很重要的,因为我们希望实时更新数据,而不希望浏览器缓存旧的数据。
- Connection : keep-alive:保持连接打开,以便服务器可以持续发送数据。
2.SSE 字段介绍
SSE 数据流由一系列的字段组成,每个字段都以键值对的形式出现,字段之间用换行符分隔:
- event: <event_name>:可选字段,用于指定事件的名称,message是默认的事件名称。
- data:必须字段,包含事件的数据内容,可以有多行,每行都以data:开头。
- id:可选字段,提供一个唯一的标识符给事件,可用于断线重连和消息追踪。
- retry:可选字段,指定客户端在连接断开后重连的间隔时间。
3.SSE 事件数据流示例
//SSE测试
@GetMapping(value = "ssePush")
public void ssePush(HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
for (int i = 0; i < 10; i++) {
// 数据格式:
// id字段是可选的,用于指定事件的标识符;
// event字段是可选的,用于指定事件的名称;
// data字段是必须的,用于指定数据的内容;
// retry字段是可选的,用于指定客户端在连接断开后重新连接的时间间隔(以毫秒为单位)。
// 每个字段都必须以换行符(\n)结尾,并且每个消息都必须以两个换行符(\n\n)结尾。
String message = "Hello, world" + i;
response.getWriter().write("id:"+i+"\n");
response.getWriter().write("event:me\n");
response.getWriter().write("data:" + i + "\n\n");
response.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
四、SseEmitter 的基本配置
SseEmitter
是 Spring Framework 提供的一个类,用于实现 SSE(Server-Sent Events)。是一种基于 Servlet API 的机制,通过 HTTP 响应流(ResponseBody)来持续发送消息。
1.SseEmitter 介绍及用法
- 构造方法
SseEmitter()
:创建一个新的 SseEmitter 实例,使用默认的超时值。SseEmitter(Long timeout)
:创建一个新的 SseEmitter 实例,设置指定的超时时间(毫秒)。
- 发送数据
send(Object data)
:发送数据到客户端。send(Object data, MediaType mediaType)
:发送数据到客户端,并指定数据的媒体类型。send(SseEvent event)
:发送一个 SseEvent 对象到客户端。
- 关闭连接
complete()
:正常完成事件流,关闭连接。completeWithError(Throwable throwable)
:由于错误完成事件流,并关闭连接。completeWithError(String message)
:由于错误完成事件流,并关闭连接,提供错误信息。
- 连接状态处理
onCompletion(Runnable callback)
:注册连接完成的回调函数。onTimeout(Runnable callback)
:注册连接超时的回调函数。
- 获取超时时间
getTimeout()
:返回当前的超时时间(毫秒)。
- 其他
isCompleted()
:检查 SseEmitter 是否已完成。isExpired()
:检查 SseEmitter 是否已过期。
2.使用 SseEmitter 示例1
1)编写核心 SSE Client
-
创建 SSE 端点
创建一个 SseEmitter,用 uid 进行标识,uid 可以是用户标识符,也可以是业务标识符。可以理解为通信信道标识。
-
通过端点发送事件
可以定时或在时间发生是调用 SseEmitter.send() 方法来发送事件。
-
关闭端点连接
@Slf4j
@Component
public class SseClient {
private static final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建连接
*/
public SseEmitter createSse(String uid) {
//默认30秒超时,设置为0L则永不超时
SseEmitter sseEmitter = new SseEmitter(0l);
//完成后回调
sseEmitter.onCompletion(() -> {
log.info("[{}]结束连接...................", uid);
sseEmitterMap.remove(uid);
});
//超时回调
sseEmitter.onTimeout(() -> {
log.info("[{}]连接超时...................", uid);
});
//异常回调
sseEmitter.onError(
throwable -> {
try {
log.info("[{}]连接异常,{}", uid, throwable.toString());
sseEmitter.send(SseEmitter.event()
.id(uid)
.name("发生异常!")
.data("发生异常请重试!")
.reconnectTime(3000));
sseEmitterMap.put(uid, sseEmitter);
} catch (IOException e) {
e.printStackTrace();
}
}
);
try {
sseEmitter.send(SseEmitter.event().reconnectTime(5000));
} catch (IOException e) {
e.printStackTrace();
}
sseEmitterMap.put(uid, sseEmitter);
log.info("[{}]创建sse连接成功!", uid);
return sseEmitter;
}
/**
* 给指定用户发送消息
*
*/
public boolean sendMessage(String uid,String messageId, String message) {
if (StrUtil.isBlank(message)) {
log.info("参数异常,msg为null", uid);
return false;
}
SseEmitter sseEmitter = sseEmitterMap.get(uid);
if (sseEmitter == null) {
log.info("消息推送失败uid:[{}],没有创建连接,请重试。", uid);
return false;
}
try {
sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
log.info("用户{},消息id:{},推送成功:{}", uid,messageId, message);
return true;
}catch (Exception e) {
sseEmitterMap.remove(uid);
log.info("用户{},消息id:{},推送异常:{}", uid,messageId, e.getMessage());
sseEmitter.complete();
return false;
}
}
/**
* 断开
* @param uid
*/
public void closeSse(String uid){
if (sseEmitterMap.containsKey(uid)) {
SseEmitter sseEmitter = sseEmitterMap.get(uid);
sseEmitter.complete();
sseEmitterMap.remove(uid);
}else {
log.info("用户{} 连接已关闭",uid);
}
}
}
2)编写 Controller
- 打开页面默认页面,传递端点标识
- 连接端点(/createSse),页面需要使用
- 通过 ajax(/sendMsg),触发后端业务,向页面发送消息
- 主动关闭连接(/closeSse)
@Controller
public class IndexAction {
@Autowired
private SseClient sseClient;
@GetMapping("/")
public String index(ModelMap model) {
String uid = IdUtil.fastUUID();
model.put("uid",uid);
return "index";
}
@CrossOrigin
@GetMapping("/createSse")
public SseEmitter createConnect(String uid) {
return sseClient.createSse(uid);
}
@CrossOrigin
@GetMapping("/sendMsg")
@ResponseBody
public String sseChat(String uid) {
for (int i = 0; i < 10; i++) {
sseClient.sendMessage(uid, "no"+i,IdUtil.fastUUID());
}
return "ok";
}
/**
* 关闭连接
*/
@CrossOrigin
@GetMapping("/closeSse")
public void closeConnect(String uid ){
sseClient.closeSse(uid);
}
}
3)前端接收与处理
前端每接收到一次SSE推送的事件,就会在id为"con"的元素中追加数据。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div id="con"></div>
<script>
let chat = document.getElementById("con");
if (window.EventSource) {
//创建sse
eventSource = new EventSource(`/createSse?uid=${uid}`);
eventSource.onopen = function (event) {
console.log('SSE链接成功');
}
eventSource.onmessage = function (event) {
if(event.data){
chat.innerHTML += event.data + '<br/>';
//console.log('后端返回的数据:', data.value);
}
}
eventSource.onerror = (error) => {
console.log('SSE链接失败');
};
} else {
alert("你的浏览器不支持SSE");
}
</script>
</body>
</html>
3.使用 SseEmitter 示例2
1)后端实现SSE
@RestController
public class SseController {
@GetMapping("/stream")
public SseEmitter handleSse(HttpServletResponse response) {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
new Thread(() -> { // 创建一个新的线程
try {
for (int i = 0; i < 10; i++) {
String message = "Hello, world" + i;
emitter.send(SseEmitter.event()
.id(i + "")
.name("message")
.data(message));
Thread.sleep(1000); // 每秒发送一条消息
}
emitter.complete(); // 发送完毕后关闭连接
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e); // 错误完成事件流,并关闭连接
}
}).start();
return emitter;
}
}
2)前端接入SSE
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Client</title>
</head>
<body>
<h1>Server-Sent Events Client</h1>
<pre id="output"></pre>
<button id="closeButton">Close Connection</button>
<script>
const eventSource = new EventSource('<http://localhost:3000/stream>');
const output = document.getElementById('output');
const closeButton = document.getElementById('closeButton');
eventSource.onmessage = function(event) {
output.textContent += event.data + '\\n';
};
eventSource.onerror = function(event) {
console.error('EventSource failed: ', event);
eventSource.close(); // 可以选择在发生错误时关闭连接
};
eventSource.addEventListener('end', function(event) {
console.log('Server closed the connection: ', event);
eventSource.close();
});
closeButton.addEventListener('click', function() {
eventSource.close();
});
</script>
</body>
</html>
五、注意事项
1.断开连接时
当客户端断开连接时,SseEmitter 会抛出 IOException,所以务必捕获并处理这种异常,通常情况下我们会调用 emitter.complete() 或 emitter.completeWithError() 来关闭 SseEmitter。
2.nginx配置
这里记录一个踩坑情况:在我没有配置 nginx 时,调用 SSE 接口,通过IP+端口访问与直接通过域名访问是不一样的。由于没有配置 nginx,域名访问的接口会等待所有消息发送后,全部一起在页面展示。而IP+端口则会一条条的展示消息。所以大家遇到类似情况记得配置 nginx。如下:
3.广播推送
我们以「文件下载」功能进行说明,一般情况下,大文件的下载,服务端压力比较大、处理时间也比较长,为了有更好的交互体验,我们可以使用异步处理,服务端处理完了之后主动通知 客户端。
下载完成后,我们需要将完成事件推送给客户端。需要注意的是,由于服务是集群部署、SSE 连接在节点本地 Map 维护,这就有可能导致当前客户端的 SSE连接所在节点 与 事件推送节点 是两个独立的节点。
因此,我们这里借助于 Redis 的发布/订阅能力,将消息广播出去,能匹配连接的节点负责将消息推送至客户端、其他节点直接丢弃即可。效果图如下:
能否做到精准投递?
可以的,我们可以这样:
借助 Redis 做中心存储,存储 Map <用户, 节点IP> 这样的映射关系。
在推送消息之前,先通过映射关系找到该用户的 SSE 连接所在节点,然后通过 RPC 调用,直接将消息投递到对应的服务节点,最后由该节点进行事件推送。
一般情况下,我们可以用「广播」这种简单粗暴的方式应对大部分场景,毕竟「精准投递」需要中心化维护节点关系、应对节点变更等,处理起来稍显麻烦。当然,具体视业务场景来做选择即可。
3.安全问题
1)防止 XSS 攻击
由于 SSE 允许服务器动态地向客户端页面发送数据,如果不正确处理,可能会成为 XSS 攻击的载体。确保对所有接收到的数据进行适当的清理和编码,避免直接插入到 DOM 中。
eventSource.onmessage = function(event) {
const safeData = encodeURI(event.data); // 对数据进行URL编码
const messageElement = document.createElement('div');
messageElement.textContent = safeData; // 安全地将数据添加到页面
document.getElementById('messages').appendChild(messageElement);
};
2)验证连接请求
验证所有SSE连接请求,确保它们来自可信的源。可以通过检查Referer头或使用身份验证令牌来实现。
// 检查请求来源
String refererHost = request.getHeader("Referer");
if (refererHost == null || !refererHost.contains("trusted-domain.com")) {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
return;
}
3)限制连接数量
为了防止资源耗尽攻击,服务器应该限制每个客户端可以建立的SSE连接数量。这可以通过在服务器端设置最大连接数来实现。
4)限制连接数量
为了防止资源耗尽攻击,服务器应该限制每个客户端可以建立的SSE连接数量。这可以通过在服务器端设置最大连接数来实现。
5)监控和日志记录
启用详细的日志记录和监控机制,以便在发生安全事件时快速响应。记录所有SSE连接的元数据,如IP地址、连接时间等。
6)实施访问控制
使用适当的访问控制策略,确保只有授权用户才能接收敏感数据。这可能涉及到用户认证和授权机制。
4.服务端资源消耗
1)连接开销
SSE通过保持HTTP连接打开来实现服务器向客户端的持续数据推送。这意味着服务器需要为每个SSE连接分配内存和资源,用于维护连接状态和数据缓冲 在Java中,可以使用线程或异步处理来管理SSE连接,但需要注意资源的合理分配和回收。
@GetMapping("/stream")
public SseEmitter handleSseRequest(HttpServletRequest request) {
SseEmitter emitter = new SseEmitter();
// 添加资源清理逻辑
emitter.onCompletion(() -> {
// 清理资源
});
return emitter;
}
2)并发连接
当大量客户端同时连接到服务器时,服务器需要处理的并发连接数增加,这会显著增加CPU和内存的使用率。 可以使用线程池来控制并发量,例如在Spring框架中配置线程池:
@Configuration
public class AsyncConfig {
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("SSE-Executor-");
return executor;
}
}
3)状态管理
服务器需要维护每个SSE连接的状态,包括发送的数据、重连尝试等。状态管理的复杂性随着连接数的增加而增加。 可以使用数据库或缓存来存储和管理SSE连接状态:
// 伪代码,展示如何存储和检索SSE连接状态
ConnectionState state = connectionStateRepository.findByConnectionId(connectionId);
state.updateWithData(latestData);
connectionStateRepository.save(state);
4)内存泄漏防护
长时间运行的SSE连接可能会导致内存泄漏,特别是如果不正确地管理事件监听器和相关资源。 确保在连接关闭时清理所有资源:
emitter.onCompletion(() -> {
// 清理内存,取消定时器,关闭数据库连接等
});
5)日志和监控
适当的日志记录和监控可以帮助识别和解决资源消耗问题。 实现自定义的日志记录和监控逻辑:
@GetMapping("/stream")
public SseEmitter handleSseRequest(HttpServletRequest request) {
SseEmitter emitter = new SseEmitter();
emitter.onTimeout(() -> log.warn("SSE connection timed out"));
emitter.onCompletion(() -> log.info("SSE connection completed"));
return emitter;
}
6)优化策略
- 「连接复用」:尽可能复用现有的连接,减少连接建立和关闭的开销
- 「批量发送」:如果可能,批量发送数据而不是单个事件,减少数据包的数量
- 「使用高效的序列化」:选择高效的数据序列化方法,减少数据传输的大小
- 「超时和自动重连」:合理设置超时时间和自动重连策略,避免不必要的资源浪费
好事定律:每件事最后都会是好事,如果不是好事,说明还没到最后。