websocket 实现前后端通信
1.spring boot 引入依赖
<dependency>
<!-- websocket -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.添加配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Description: 开启WebSocket支持
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.构建websocketserver
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
/**
* @version 1.0
* @description 存放websocket连接的集合
* @date 2025/1/20 10:46
**/
@Slf4j
public class WebSockeServerLocalCache {
@Getter
private final static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
/**
* 将指定的键添加到集合中。
*
* @param webSocketServer 要添加到集合中的键
* @return 添加成功,则返回 true;否则返回 false
*/
public static void save(WebSocketServer webSocketServer) {
log.info("添加到缓存中:{}", webSocketServer);
webSocketSet.add(webSocketServer);
}
/**
* 判断集合中是否不包含指定的键。
*
* @param webSocketServer 要删除的实例
*/
public static boolean remove(WebSocketServer webSocketServer) {
return webSocketSet.remove(webSocketServer);
}
/***
* 根据sid获取websocket连接
* @param sid
* @return
*/
public static CopyOnWriteArraySet<WebSocketServer> get(String sid) {
return webSocketSet.stream().filter(webSocketServer -> webSocketServer.getSid().equals(sid)).collect(Collectors.toCollection(CopyOnWriteArraySet::new));
}
}
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @version 1.0
* @description WebSocketServer实例
* @date 2025/1/20 10:46
**/
@Getter
@Slf4j
@Service
@ServerEndpoint(value = "/sysoperation/alarmWebSocketData/{sid}", encoders = {CustomEncoder.class})
public class WebSocketServer {
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
this.sid = sid;
//在线数加1
addOnlineCount();
//保存实例到set中
WebSockeServerLocalCache.save(this);
sendMessage("conn_success");
log.info("有新窗口开始监听sid{},当前在线人数为:{}",sid, getOnlineCount());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
//从set中删除
// webSocketSet.remove(this);
boolean remove = WebSockeServerLocalCache.remove(this);
log.info("移除的sid为:{},结果为{}",sid,remove);
//在线数减1
subOnlineCount();
//断开连接情况下,更新占用情况为释放
log.info("释放的sid为:" + sid);
//这里写你 释放的时候,要处理的业务 todo
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
// 连接关闭 停止任务
}
/**
* 收到客户端消息后调用的方法
*
* @Param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口sid{},的信息:{}",sid, message);
//群发消息
WebSockeServerLocalCache.get(sid).forEach(webSocketServer -> {
webSocketServer.sendMessage(message);
});
}
/**
* @Param session
* @Param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(Object message) {
try {
this.session.getBasicRemote().sendObject(message);
} catch (IOException | EncodeException e) {
log.error("推送消息发生异常:" + e.getMessage());
}
}
/**
* 群发自定义消息
*/
public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {
log.info("推送消息到窗口" + sid + ",推送内容:" + message);
CopyOnWriteArraySet<WebSocketServer> webSocketServers = WebSockeServerLocalCache.get(sid);
for (WebSocketServer item : webSocketServers) {
//这里可以设定只推送给这个sid的,为null则全部推送
if (sid == null) {
// item.sendMessage(message);
} else if (item.sid.equals(sid)) {
item.sendMessage(message);
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
其他公共类
@Data
public class AlarmInfoDTO implements Serializable {
private String alarmType;
private String time;
private String alarmContent;
private String alarmTitle;
private String alarmReceivePerson;
private String realAlarm="Real";
private AlarmInfoExtDataDTO extData;
}
@Data
public class AlarmInfoExtDataDTO implements Serializable {
private Long taskId;
private String deviceId;
private String imagePath;
private String storagePath;
private String motorVehicleId;
private String plateNo;
private String plateColor;
private String plateClass;
private String vehicleClass;
private String vehicleBrand;
private String deviceName;
private String latitude;
private String longitude;
private String areaName;
}
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomEncoder implements Encoder.Text<Object> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String encode(Object object) throws EncodeException {
try {
return objectMapper.writeValueAsString(object);
} catch (Exception e) {
throw new EncodeException(object, "Error encoding object", e);
}
}
@Override
public void init(EndpointConfig config) {
// 初始化时的逻辑(可选)
}
@Override
public void destroy() {
// 销毁时的逻辑(可选)
}
}
前端html测试
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Java后端WebSocket的Tomcat实现</title>
</head>
<body>
<h1>Welcome</h1>
<br/><input id="text" type="text" />
<button onclick="send()">发送消息</button>
<hr/>
<button onclick="closeWebSocket()">关闭WebSocket连接</button>
<hr/>
<div id="message"></div>
</body>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window) {
//改成你的地址 ws://127.0.0.1:10242/sysoperation/alarmWebSocketData/{sid} 用taskid 当作sid
websocket = new WebSocket("ws://127.0.0.1:10242/sysoperation/alarmWebSocketData/48");
} else {
alert('当前浏览器 Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function() {
setMessageInnerHTML("WebSocket连接发生错误");
};
//连接成功建立的回调方法
websocket.onopen = function() {
setMessageInnerHTML("WebSocket连接成功");
}
var U01data, Uidata, Usdata
//接收到消息的回调方法
websocket.onmessage = function(event) {
console.log(event);
setMessageInnerHTML(event);
//setechart()
}
//连接关闭的回调方法
websocket.onclose = function() {
setMessageInnerHTML("WebSocket连接关闭");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
closeWebSocket();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭WebSocket连接
function closeWebSocket() {
websocket.close();
}
//发送消息
function send() {
var message = document.getElementById('text').value;
websocket.send('{"msg":"' + message + '"}');
setMessageInnerHTML(message + " ");
}
</script>
</html>