1、引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
2、保存登录信息实体类
@Data
public class ClientInfoEntity {
/**
* 客户端连接的session
*/
private Session session;
/**
* 连接存活时间
*/
private LocalDateTime existTime;
}
3、SpringContextHolder
@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
private static ApplicationContext applicationContext = null;
private static Logger logger = LoggerFactory.getLogger(SpringContextHolder.class);
public SpringContextHolder() {
}
public static <T> T getBean(Class<T> requiredType) {
assertContextInjected();
return applicationContext.getBean(requiredType);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextHolder.applicationContext != null) {
logger.info("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
}
SpringContextHolder.applicationContext = applicationContext;
}
@Override
public void destroy() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext);
}
applicationContext = null;
}
private static void assertContextInjected() {
}
}
4、创建ServerEndpoint
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
// 该类负责监听客户端的连接、断开连接、接收消息、发送消息等操作
@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/notice/websocket")
public class WsServerEndpoint {
//key:accountName、sessionId
private static Table<String, String, ClientInfoEntity> table = HashBasedTable.create();
private static final int EXIST_TIME_HOUR = 2;
// 注:spring bean生命周期与websocket不一样,不能直接注入
@Autowired
private TmsgNoticeRecordService tmsgNoticeRecordService = null;
@Autowired
private AuthClientUtils authClientUtils = null;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
if (null == authClientUtils) {
authClientUtils = SpringContextHolder.getBean(AuthClientUtils.class);
}
String accountName = null;
Map<String, List<String>> params = session.getRequestParameterMap();
List<String> tokens = params.get("token");
if (CollectionUtils.isNotEmpty(tokens)) {
try {
accountName = authClientUtils.getCurrentAccountName(tokens.get(0));
} catch (Exception e) {
log.error("webSocket:", e);
}
}
if (null == accountName) {
log.info("WebSocket 连接建立失败");
throw new RuntimeException("accountName获取失败,WebSocket 连接建立失败");
}
if (table.containsColumn(session.getId())) {
log.info("token已建立连接");
return;
}
if (null == tmsgNoticeRecordService) {
tmsgNoticeRecordService = SpringContextHolder.getBean(TmsgNoticeRecordService.class);
}
//业务逻辑
try {
// 通过账号查询当前登录人接收到所有未读的消息
List<Map<String,Object> list = tmsgNoticeRecordService.pushDataList(accountName);
if (CollectionUtils.isNotEmpty(list)) {
for (int i = 0; i < list.size(); i++) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg", list.get(i).get("msg"));
// 发送消息
session.getAsyncRemote().sendText(jsonObject.toJSONString());
}
}
} catch (Exception e) {
log.info("业务逻辑异常", e);
}
//把成功建立连接的会话在实体类中保存
ClientInfoEntity entity = new ClientInfoEntity();
entity.setSession(session);
entity.setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
table.put(accountName, session.getId(), entity);
log.info("WebSocket 连接建立成功: " + accountName);
}
/**
* 当断开连接时调用该方法
*
* @param session
*/
@OnClose
public void onClose(Session session) {
if (!table.isEmpty() && table.containsColumn(session.getId())) {
Iterator<Map.Entry<String, ClientInfoEntity>> iterator = table.column(session.getId()).entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ClientInfoEntity> rowValue = iterator.next();
table.remove(rowValue.getKey(), session.getId());
log.info("WebSocket 连接关闭成功");
}
}
}
/**
* 接受消息
* 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
*/
@OnMessage
public void onMessage(Session session, String message) throws IOException {
if (!table.isEmpty() && table.containsColumn(session.getId())) {
Iterator<Map.Entry<String, ClientInfoEntity>> iterator = table.column(session.getId()).entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ClientInfoEntity> cell = iterator.next();
//如果是心跳包
if ("heartbeat".equals(message)) {
//只要接受到客户端的消息就进行续命(时间)
cell.getValue().setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
if (cell.getValue().getSession().isOpen()) {
cell.getValue().getSession().getBasicRemote().sendText("heartbeat");
}
return;
}
//只要接受到客户端的消息就进行续命(时间)
cell.getValue().setExistTime(LocalDateTime.now().plusHours(EXIST_TIME_HOUR));
if (cell.getValue().getSession().isOpen()) {
cell.getValue().getSession().getBasicRemote().sendText("heartbeat");
}
}
}
}
/**
* 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
*/
@OnError
public void onError(Throwable error) {
log.error("webSocket报错信息:" + error.toString());
}
/**
* 心跳 每20秒发送一次
*/
@XxlJob("message.heartbeat")
public ReturnT<String> heartbeat() {
XxlJobHelper.log("心跳");
if (!table.isEmpty()) {
//超过存活时间进行删除
Iterator<Table.Cell<String, String, ClientInfoEntity>> iterator = table.cellSet().iterator();
while (iterator.hasNext()) {
Table.Cell<String, String, ClientInfoEntity> cell = iterator.next();
if (cell.getValue().getSession().isOpen()) {
cell.getValue().getSession().getAsyncRemote().sendText("heartbeat");
} else {// 已经断开连接
table.remove(cell.getRowKey(), cell.getColumnKey());
}
}
}
return ReturnT.SUCCESS;
}
/**
* @Description: 同账号多次登录,通过账号发送消息
*/
private void send(String rowKey, String msg) {
Iterator<Map.Entry<String, ClientInfoEntity>> columnIterator = table.row(rowKey).entrySet().iterator();
while (columnIterator.hasNext()) {
Map.Entry<String, ClientInfoEntity> column = columnIterator.next();
if (column.getValue().getSession().isOpen()) {
column.getValue().getSession().getAsyncRemote().sendText(msg);
}
}
}
}
WebSocket配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@EnableWebSocket
@Configuration
public class WebSocketConfig {
/**
* 该方法用来创建并返回一个ServerEndpointExporter实例。
* 这个实例的作用是扫描并自动配置所有使用@ServerEndpoint注解标记的WebSocket端点
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
启动应用程序并测试
1、启动类加@EnableWebSocket注解
2、使用websocket测试工具访问: ws://127.0.0.1:8080//notice/websocket?token=xxxxx