当前位置: 首页 > article >正文

SpringBoot 搭建 SSE

参考链接

https://www.51cto.com/article/798001.html

了解一下SseEmitter(一)-CSDN博客

依赖

有默认的 springboot-web 依赖即可

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

SSEmitter API

方法

说明

onCompletion()

回调方法,连接完成(正常关闭)时触发(超时,异常,complete() 之后触发)

onTimeout()

回调方法,当连接超时时触发

onError()

指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。

completeWithError(e)

在发生错误时关闭连接,并以错误的形式告知客户端

complete()

表示数据发送完成后关闭连接

event()

创建一个事件对象,设置事件名称和数据

使用示例emitter.send(SseEmitter.event().name("message").data("数据!"));

onCompletion() 回调函数触发条件说明:他只会在连接关闭(正常关闭)后调用,可以用于执行一些清理工作。

触发条件:

  1. 调用 complete()
  2. 调用 completeWithError(e)
  3. 超时断开连接(例如:new SseEmitter(1000L),那么在 1 秒后就会调用 onCompletion() 的回调函数);回调调用顺序:onTimeout() -> onCompletion()

服务端

controller

/***
 * @author feiXun
 * @create 2025/1/13 14:09
 **/
@RestController
@RequestMapping("/sse")
@CrossOrigin(origins = "*")
public class SseController {

    @Autowired
    private SseService sseService;

    @GetMapping("/connect")
    @ApiOperation(value = "SSE 客户端连接")
    public SseEmitter connect() throws IOException {
        return sseService.connect();
    }


    @GetMapping("/batchSend")
    @ApiOperation(value = "SSE 群发消息", notes = "SSE 群发消息, 目前用于测试,后期可以删除")
    public void batchSend(@RequestParam("message") Object message){
        sseService.batchSend(message);
    }

}

Service 接口

/***
 * @author feiXun
 * @create 2025/1/13 14:52
 **/
public interface SseService {

    /**
     * 连接
     */
    SseEmitter connect() throws IOException;

    /**
     * 批量发送消息
     */
    void batchSend(Object message);

    /**
     * 发送心跳包
     */
    void sendHeartBeat();


}

Service 实现类

/***
 * @author feiXun
 * @create 2025/1/13 14:52
 **/
@Slf4j
@Service
public class SseServiceImpl implements SseService {

    /**
     * 用于存储 sseEmitterList
     */
    private final List<SseEmitter> sseEmitterList = new CopyOnWriteArrayList<>();

    /**
     * 消息队列
     * 用于将错误的信息,没有 SSE 连接时发送的信息 保存起来
     * 在重新连接的时候推送给 sse 客户端
     */
    private final Queue<SseEmitter.SseEventBuilder> messages = new ConcurrentLinkedQueue<>();

    @Override
    public SseEmitter connect() throws IOException {
        // 0 表示无限长连接;其他:毫秒数,表示连接时长,比如 1000L,就是 1秒后断开连接
        SseEmitter emitter = new SseEmitter(0L);

        // sse 连接完成,准备释放
        emitter.onCompletion(completionCallBack(emitter));

        // 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。
        emitter.onError(errorCallBack(emitter));

        // 添加 list,用于发送给多个 sse 客户端
        sseEmitterList.add(emitter);

        // 这里是将 发生错误时,没有 SSE 客户端时 的消息在发一遍
        while (!messages.isEmpty()){
            SseEmitter.SseEventBuilder poll = messages.poll();
            try {
                // 发送信息
                emitter.send(poll);
            } catch (IOException e) {
                // 将信息加入队列,下次连接的时候推送给客户端
                messages.add(poll);
                // 关闭连接并抛出异常给客户端
                emitter.completeWithError(e);
                break;
            }
        }

        // 发送一条 心跳包信息
        emitter.send(MrSseVo.buildHeartBeat());
        return emitter;
    }

    /**
     * 群发消息
     */
    @Override
    public void batchSend(Object message) {
        // 将消息加入消息队列,当有 SSE 连接时,从队列中取出信息推送给 sse 客户端
        if (sseEmitterList.isEmpty()){
            SseEmitter.SseEventBuilder data = builderMessage(message);
            messages.add(data);
            return;
        }
        sseEmitterList.forEach(sseEmitter -> {
            send(sseEmitter, message, false);
        });
    }

    @Override
    public void sendHeartBeat() {
        if (!sseEmitterList.isEmpty()){
            // 批量发送心跳包
            sseEmitterList.forEach(sseEmitter -> {
                send(sseEmitter, "ping", true);
            });
        }
    }

    /**
     * 发送消息
     * @param isPing 是否是心跳包
     */
    private void send(SseEmitter sseEmitter, Object message, boolean isPing){
        SseEmitter.SseEventBuilder data = builderMessage(message);
        try {
            // 发送信息
            sseEmitter.send(message);
        } catch (IOException e) {
            if (!isPing){
                // 将信息加入队列,下次连接的时候推送给客户端
                messages.add(data);
            }
            // 关闭连接并抛出异常给客户端
            sseEmitter.completeWithError(e);
        }
    }

    /**
     * SSE 连接完成 回调(连接已关闭,正准备释放)
     * 触发条件(前提: 客户端的连接没断开)
     * 1. 调用 complete()
     * 2. 调用 completeWithError(e)
     * 3. 超时断开连接(例如:new SseEmitter(1000L),那么在 1 秒后就会调用 onCompletion() 的回调函数);回调调用顺序:onTimeout() -> onCompletion()
     */
    private Runnable completionCallBack(SseEmitter emitter){
        return ()->{
            log.info("连接已关闭,准备释放!");
            // 将 sse 连接 移除
            sseEmitterList.remove(emitter);
        };
    }

    /**
     * SSE 异常回调
     * 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。
     */
    private Consumer<Throwable> errorCallBack(SseEmitter sseEmitter){
        return throwable -> {
            log.error("SSE 异常 {}", throwable.getMessage());
            // 关闭连接并抛出异常给客户端
            sseEmitter.completeWithError(throwable);
        };
    }

    /**
     * 构造消息
     */
    private SseEmitter.SseEventBuilder builderMessage(Object message){
        return SseEmitter.event().
                id(UUID.randomUUID().toString())
                .data(message, MediaType.APPLICATION_JSON);
    }

}

客户端

http://127.0.0.1:8000/sse/connect:这个接口用于跟 sse 服务端建立连接

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Client</title>
</head>
<body>
    SSE Client
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('http://127.0.0.1:8000/sse/connect');

        eventSource.onmessage = function(event) {
            const eventsDiv = document.getElementById('events');
            eventsDiv.innerHTML += `<p>${event.data}</p>`;
        };

        eventSource.onerror = function(err) {
            console.error("EventSource failed:", err);
        };
    </script>
</body>
</html>

测试

  1. 打开客户端 或者 浏览器访问 连接 接口(/sse/connect)
  2. 调用发送接口 (/sse/send)
  3. 查看网页等有没有出现相应的信息

例如:


http://www.kler.cn/a/512885.html

相关文章:

  • Vue3初学之Element Plus Dialog对话框,Message组件,MessageBox组件
  • Spring 中的事件驱动模型
  • SDL2:arm64下编译使用 -- SDL2多媒体库使用音频实例
  • 【Leetcode 热题 100】70. 爬楼梯
  • 利用R计算一般配合力(GCA)和特殊配合力(SCA)
  • 使用docker部署mysql和tomcat服务器发现的问题整理
  • Numpy基础01(Jupyter基本用法/Ndarray创建与基本操作)
  • vue.draggable 拖拽
  • 2025年国产化推进.NET跨平台应用框架推荐
  • MyBatis操作数据库(入门)
  • 【Java实现导出Excel使用EasyExcel快速实现数据下载到Excel功能】
  • Qt之QDjango-db的简单使用
  • 三格电子——MODBUS TCP 转 CANOpen 协议网关
  • 网络通信---MCU移植LWIP
  • 从零开始:使用 Brain.js 创建你的第一个神经网络(一)
  • Redis - 通用命令
  • Spring Boot 整合 PageHelper 实现分页功能
  • 线程池遇到未处理的异常会崩溃吗?
  • Redis的Windows版本安装以及可视化工具
  • PHP代码审计学习01
  • Github 2025-01-20 开源项目周报 Top15
  • Docker:基于自制openjdk8镜像 or 官方openjdk8镜像,制作tomcat镜像
  • Linux 时间操作详解
  • 什么是馈线自动化(FA)?其优点是什么?本文给出答案
  • 14,c++——继承
  • VSCode 使用默认profile打开文件