当前位置: 首页 > article >正文

SpringBoot使用WebSocket收发实时离线消息

引入maven依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

 WebScoket配置处理器


import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.ServletContext;


/**
 * WebScoket配置处理器
 */
@Configuration
public class WebSocketConfig implements ServletContextInitializer {
	 /**
     * ServerEndpointExporter 作用
     *
     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
     *
     * @return
     */
	@Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    //设置websocket发送内容长度
    @Override
    public void onStartup(ServletContext servletContext)  {
        servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");
    }
}

webScoket消息对象

import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.Date;

/**
* @author: ws
* @date: 20223/10/26 15:59
* @Description: WebSocketMessage
*/
@Data
public class WebSocketMessage {

/**
* 用户ID
*/
private String fromId;

/**
* 对方ID
*/
private String toOtherId;
//消息内容
private String message;

//发送时间
@JSONField(format="yyyy-MM-dd HH:mm:ss")
public Date date;

}

WebSocket操作类

import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.ws.wxyinghang.entity.WebSocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author: ws
 * @date: 20223/10/26 15:59
 * @Description: WebSocket操作类
 */
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketSever {

    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String userId;


    // session集合,存放对应的session
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

    // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
    private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();
    // 用于存放离线消息
    private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap = new ConcurrentHashMap();

    /**
     * 建立WebSocket连接
     *
     * @param session
     * @param userId  用户ID
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        log.info("WebSocket建立连接中,连接用户ID:{}", userId);
        try {
            Session historySession = sessionPool.get(userId);
            // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
            if (historySession != null) {
                webSocketSet.remove(historySession);
                historySession.close();
            }
        } catch (IOException e) {
            log.error("重复登录异常,错误信息:" + e.getMessage(), e);
        }
        // 建立连接
        this.session = session;
        this.userId = userId;
        webSocketSet.add(this);
        sessionPool.put(userId, session);
        //从离线消息队列里面获取消息
        if (offlineMessageMap.containsKey(userId)) {
            List<WebSocketMessage> list = offlineMessageMap.get(userId);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Object x = it.next();
                //离线消息接收成功后删除消息
                Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));
                if (bb) {
                    System.out.println("从队列中删除离线消息" + x);
                    it.remove();
                }
            }
            offlineMessageMap.remove(userId);
        }
        log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());
    }

    /**
     * 发生错误
     *
     * @param throwable e
     */
    @OnError
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    /**
     * 连接关闭
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        sessionPool.remove(this.userId);
        log.info("连接断开,当前在线人数为:{}", webSocketSet.size());
    }

    /**
     * 接收客户端消息
     *
     * @param message 接收的消息
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("收到客户端发来的消息:{}", message);
        sendMessageByUser(message);
    }

    /**
     * 推送消息到指定用户
     *
     * @param message 发送的消息
     */
    public static Boolean sendMessageByUser(String message) {
        WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
        log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
        Session session = sessionPool.get(msg.getToOtherId());
        //判断session是否正常
        if (session == null || !session.isOpen()) {
            log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");
            if (offlineMessageMap.containsKey(msg.getToOtherId())) {
                List<WebSocketMessage> list = offlineMessageMap.get(msg.getToOtherId());
                list.add(msg);
                offlineMessageMap.put(msg.getToOtherId(), list);
            } else {
                offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));
            }
        }//发送消息
        else {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
                return false;
            }
        }
        return true;
    }

    //发送离线消息
    public static Boolean sendOfflineMessageByUser(String message) {
        WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
        log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
        Session session = sessionPool.get(msg.getToOtherId());
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
            return false;
        }
        return true;
    }

    /**
     * 群发消息
     *
     * @param message 发送的消息
     */
    public static void sendAllMessage(String message) {
        log.info("发送消息:{}", message);
        for (WebSocketSever webSocket : webSocketSet) {
            try {
                webSocket.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("群发消息发生错误:" + e.getMessage(), e);
            }
        }
    }

}

启动项目,使用apiFox测试,新建webScoket接口

新建websocket1,连接后发送消息 

 

新建webScoket2 ,可以看到连接后接收到了消息 

 

如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。


http://www.kler.cn/a/107831.html

相关文章:

  • git没有识别出大写字母改成小写重命名的文件目录
  • 「Mac玩转仓颉内测版12」PTA刷题篇3 - L1-003 个位数统计
  • Linux系统编程多线程之条件变量和信号量讲解
  • SQL面试题——奔驰SQL面试题 车辆在不同驾驶模式下的时间
  • 微服务各组件整合
  • 服务器显卡和桌面pc显卡有什么不同
  • 【理论知识:Window Aggregation】flink 窗口聚合功能概述:两种窗口聚合模式的使用例子、功能说明
  • 【Codeforces】 CF582D Number of Binominal Coefficients
  • 浅谈 MySQL 主从复制,优点?原理?
  • Spring Security漏洞防护—HttpFirewall和 HTTPS
  • StripedFly恶意软件框架感染了100万台Windows和Linux主机
  • 0基础学习PyFlink——用户自定义函数之UDTAF
  • Git 拉取远程更新报错
  • Linux音频-基本概念
  • 记录--vue3实现excel文件预览和打印
  • NewStarCTF2023week4-Nmap
  • 【华为OD:C++机试】Day-1
  • 已解决:conda找不到对应版本的cudnn如何解决?
  • K8s 部署 CNI 网络组件+k8s 多master集群部署+负载均衡
  • 猴子吃桃问题--C语言
  • 计算机操作系统重点概念整理-第五章 文件管理【期末复习|考研复习】
  • 不再受害:如何预防和应对.mallab勒索病毒攻击
  • Mac运行Docker报错
  • shell实现部署ftp提供共享yum仓库
  • QT中获取当前项目路径的写法
  • 已有wchar_t *类型的filepath,使用Qt打开该文件