当前位置: 首页 > article >正文

SpringBoot + Websocket实现系统用户消息通知

1、引入依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.7</version>
    <relativePath/>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

2、保存登录信息实体类

@Data
public class ClientInfoEntity {
    /**
     * 客户端连接的session
     */
    private Session session;
    /**
     * 连接存活时间
     */
    private LocalDateTime existTime;
}

3、SpringContextHolder

@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
    private static ApplicationContext applicationContext = null;
    private static Logger logger = LoggerFactory.getLogger(SpringContextHolder.class);

    public SpringContextHolder() {
    }

    public static <T> T getBean(Class<T> requiredType) {
        assertContextInjected();
        return applicationContext.getBean(requiredType);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringContextHolder.applicationContext != null) {
            logger.info("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
        }

        SpringContextHolder.applicationContext = applicationContext;
    }

    @Override
    public void destroy() throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext);
        }
        applicationContext = null;
    }

    private static void assertContextInjected() {
    }
}

4、创建ServerEndpoint

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

// 该类负责监听客户端的连接、断开连接、接收消息、发送消息等操作
@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/notice/websocket")
public class WsServerEndpoint {
    //key:accountName、sessionId
    private static Table<String, String, ClientInfoEntity> table = HashBasedTable.create();

    private static final int EXIST_TIME_HOUR = 2;
     // 注:spring bean生命周期与websocket不一样,不能直接注入
    @Autowired
    private TmsgNoticeRecordService tmsgNoticeRecordService = null;
    @Autowired
    private AuthClientUtils authClientUtils = null;

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        if (null == authClientUtils) {
            authClientUtils = SpringContextHolder.getBean(AuthClientUtils.class);
        }

        String accountName = null;
        Map<String, List<String>> params = session.getRequestParameterMap();
        List<String> tokens = params.get("token");
        if (CollectionUtils.isNotEmpty(tokens)) {
            try {
                accountName = authClientUtils.getCurrentAccountName(tokens.get(0));
            } catch (Exception e) {
                log.error("webSocket:", e);
            }
        }

        if (null == accountName) {
            log.info("WebSocket 连接建立失败");
            throw new RuntimeException("accountName获取失败,WebSocket 连接建立失败");
        }

        if (table.containsColumn(session.getId())) {
            log.info("token已建立连接");
            return;
        }
        if (null == tmsgNoticeRecordService) {
            tmsgNoticeRecordService = SpringContextHolder.getBean(TmsgNoticeRecordService.class);
        }

        //业务逻辑
        try {
            // 通过账号查询当前登录人接收到所有未读的消息
            List<Map<String,Object> list = tmsgNoticeRecordService.pushDataList(accountName);
            if (CollectionUtils.isNotEmpty(list)) {
                for (int i = 0; i < list.size(); i++) {
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("msg", list.get(i).get("msg"));
                    // 发送消息
                    session.getAsyncRemote().sendText(jsonObject.toJSONString());
                }
            }
        } catch (Exception e) {
            log.info("业务逻辑异常", e);
        }

        //把成功建立连接的会话在实体类中保存
        ClientInfoEntity entity = new ClientInfoEntity();
        entity.setSession(session);
        entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));

        table.put(accountName, session.getId(), entity);

        log.info("WebSocket 连接建立成功: " + accountName);
    }

    /**
     * 当断开连接时调用该方法
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        if (!table.isEmpty() && table.containsColumn(session.getId())) {
            Iterator<Map.Entry<String, ClientInfoEntity>> iterator = table.column(session.getId()).entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, ClientInfoEntity> rowValue = iterator.next();
                table.remove(rowValue.getKey(), session.getId());
                log.info("WebSocket 连接关闭成功");
            }
        }
    }

    /**
     * 接受消息
     * 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
     */
    @OnMessage
    public void onMessage(Session session, String message) throws IOException {
        if (!table.isEmpty() && table.containsColumn(session.getId())) {
            Iterator<Map.Entry<String, ClientInfoEntity>> iterator = table.column(session.getId()).entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, ClientInfoEntity> cell = iterator.next();

                //如果是心跳包
                if ("heartbeat".equals(message)) {
                    //只要接受到客户端的消息就进行续命(时间)
                    cell.getValue().setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
                    if (cell.getValue().getSession().isOpen()) {
                        cell.getValue().getSession().getBasicRemote().sendText("heartbeat");
                    }
                    return;
                }

                //只要接受到客户端的消息就进行续命(时间)
                cell.getValue().setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
                if (cell.getValue().getSession().isOpen()) {
                    cell.getValue().getSession().getBasicRemote().sendText("heartbeat");
                }
            }
        }
    }

    /**
     * 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
     */
    @OnError
    public void onError(Throwable error) {
        log.error("webSocket报错信息:" + error.toString());
    }

    /**
     * 心跳 每20秒发送一次
     */
    @XxlJob("message.heartbeat")
    public ReturnT<String> heartbeat() {
        XxlJobHelper.log("心跳");
        if (!table.isEmpty()) {
            //超过存活时间进行删除
            Iterator<Table.Cell<String, String, ClientInfoEntity>> iterator = table.cellSet().iterator();
            while (iterator.hasNext()) {
                Table.Cell<String, String, ClientInfoEntity> cell = iterator.next();
                if (cell.getValue().getSession().isOpen()) {
                    cell.getValue().getSession().getAsyncRemote().sendText("heartbeat");
                } else {// 已经断开连接
                    table.remove(cell.getRowKey(), cell.getColumnKey());
                }
            }
        }
        return ReturnT.SUCCESS;
    }

    /**
     * @Description: 同账号多次登录,通过账号发送消息
     */
    private void send(String rowKey, String msg) {
        Iterator<Map.Entry<String, ClientInfoEntity>> columnIterator = table.row(rowKey).entrySet().iterator();
        while (columnIterator.hasNext()) {
            Map.Entry<String, ClientInfoEntity> column = columnIterator.next();
            if (column.getValue().getSession().isOpen()) {
                column.getValue().getSession().getAsyncRemote().sendText(msg);
            }
        }
    }
}

WebSocket配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@EnableWebSocket
@Configuration
public class WebSocketConfig {
    /**
     * 该方法用来创建并返回一个ServerEndpointExporter实例。
     * 这个实例的作用是扫描并自动配置所有使用@ServerEndpoint注解标记的WebSocket端点
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

启动应用程序并测试

1、启动类加@EnableWebSocket注解


2、使用websocket测试工具访问: ws://127.0.0.1:8080//notice/websocket?token=xxxxx

http://www.kler.cn/a/505565.html

相关文章:

  • Linux 查看内存命令
  • AWS云计算概览(自用留存)
  • 如何修复Android上未安装的应用程序
  • 关于在 Kotlin DSL 中,ndk 的配置方式
  • 【llama_factory】qwen2_vl训练与批量推理
  • 总结SpringBoot项目中读取resource目录下的文件多种方法
  • Web后端开发
  • 《零基础Go语言算法实战》【题目 4-6】随机选择单链表的一个节点并返回
  • 《零基础Go语言算法实战》【题目 2-20】通过泛型比较大小
  • 设计模式--命令模式【行为型模式】
  • 【JavaWeb01】JavaWeb开发基础:HTML的深度解析与应用
  • 【计算机网络】lab8 DNS协议
  • Postman下载与使用,新手详细
  • android T 建立文件夹及文件的记录
  • Unity自带的真车模拟系统,速度不够大r时如何以匀速上桥
  • c++ string 类实现
  • 【I/O编程】UNIX文件基础
  • 深度学习中PyTorch张量的重塑操作
  • 下载文件,浏览器阻止不安全下载
  • 前端如何设计一个回溯用户操作的方案
  • c++ 手写queue循环队列
  • Windows 上的 MySQL 8.4.3 和 WSL(Ubuntu)的 MySQL 8.0.40 之间配置 主从同步
  • linux系统监视(centos 7)
  • 数据结构9——二叉搜索树
  • 使用Struts2遇到的Context[项目名称]启动失败问题解决(Java Web学习笔记)
  • 虚拟线程JDK与Spring Core Reactor