SpringBoot使用WebSocket收发实时离线消息
引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebScoket配置处理器
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.ServletContext;
/**
* WebScoket配置处理器
*/
@Configuration
public class WebSocketConfig implements ServletContextInitializer {
/**
* ServerEndpointExporter 作用
*
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
//设置websocket发送内容长度
@Override
public void onStartup(ServletContext servletContext) {
servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");
}
}
webScoket消息对象
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.Date;
/**
* @author: ws
* @date: 20223/10/26 15:59
* @Description: WebSocketMessage
*/
@Data
public class WebSocketMessage {
/**
* 用户ID
*/
private String fromId;
/**
* 对方ID
*/
private String toOtherId;
//消息内容
private String message;
//发送时间
@JSONField(format="yyyy-MM-dd HH:mm:ss")
public Date date;
}
WebSocket操作类
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.ws.wxyinghang.entity.WebSocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author: ws
* @date: 20223/10/26 15:59
* @Description: WebSocket操作类
*/
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketSever {
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private String userId;
// session集合,存放对应的session
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
// concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();
// 用于存放离线消息
private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap = new ConcurrentHashMap();
/**
* 建立WebSocket连接
*
* @param session
* @param userId 用户ID
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
log.info("WebSocket建立连接中,连接用户ID:{}", userId);
try {
Session historySession = sessionPool.get(userId);
// historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
if (historySession != null) {
webSocketSet.remove(historySession);
historySession.close();
}
} catch (IOException e) {
log.error("重复登录异常,错误信息:" + e.getMessage(), e);
}
// 建立连接
this.session = session;
this.userId = userId;
webSocketSet.add(this);
sessionPool.put(userId, session);
//从离线消息队列里面获取消息
if (offlineMessageMap.containsKey(userId)) {
List<WebSocketMessage> list = offlineMessageMap.get(userId);
Iterator it = list.iterator();
while (it.hasNext()) {
Object x = it.next();
//离线消息接收成功后删除消息
Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));
if (bb) {
System.out.println("从队列中删除离线消息" + x);
it.remove();
}
}
offlineMessageMap.remove(userId);
}
log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());
}
/**
* 发生错误
*
* @param throwable e
*/
@OnError
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
/**
* 连接关闭
*/
@OnClose
public void onClose() {
webSocketSet.remove(this);
sessionPool.remove(this.userId);
log.info("连接断开,当前在线人数为:{}", webSocketSet.size());
}
/**
* 接收客户端消息
*
* @param message 接收的消息
*/
@OnMessage
public void onMessage(String message) {
log.info("收到客户端发来的消息:{}", message);
sendMessageByUser(message);
}
/**
* 推送消息到指定用户
*
* @param message 发送的消息
*/
public static Boolean sendMessageByUser(String message) {
WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
Session session = sessionPool.get(msg.getToOtherId());
//判断session是否正常
if (session == null || !session.isOpen()) {
log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");
if (offlineMessageMap.containsKey(msg.getToOtherId())) {
List<WebSocketMessage> list = offlineMessageMap.get(msg.getToOtherId());
list.add(msg);
offlineMessageMap.put(msg.getToOtherId(), list);
} else {
offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));
}
}//发送消息
else {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
return false;
}
}
return true;
}
//发送离线消息
public static Boolean sendOfflineMessageByUser(String message) {
WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
Session session = sessionPool.get(msg.getToOtherId());
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
return false;
}
return true;
}
/**
* 群发消息
*
* @param message 发送的消息
*/
public static void sendAllMessage(String message) {
log.info("发送消息:{}", message);
for (WebSocketSever webSocket : webSocketSet) {
try {
webSocket.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("群发消息发生错误:" + e.getMessage(), e);
}
}
}
}
启动项目,使用apiFox测试,新建webScoket接口
新建websocket1,连接后发送消息
新建webScoket2 ,可以看到连接后接收到了消息
如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。