Spring + WebSocket
1. 简介
WebSocket 是一种网络通信协议,提供了在单个TCP连接上进行全双工通信的能力。它允许服务器主动向客户端发送消息,而不需要客户端不断轮询服务器来检查更新。WebSocket 协议在2011年成为国际标准,并且被广泛用于实现实时通信功能,比如在线游戏、设备数据更新、实时聊天应用和股票行情更新等。
WebSocket 的主要特点包括:
-
全双工通信:客户端和服务器可以同时发送和接收数据,无需等待对方的响应。
-
持久连接:一旦WebSocket连接建立,它会保持开放状态,直到客户端或服务器决定关闭连接。
-
低延迟:由于不需要像HTTP那样每次通信都建立和关闭连接,WebSocket可以减少通信延迟。
-
头部开销小:WebSocket协议的头部比HTTP要小,这有助于减少数据传输的开销。
WebSocket 通信的建立通常遵循以下步骤:
-
握手:客户端通过发送一个特殊的HTTP请求来发起WebSocket连接,这个请求包含了Upgrade头部,表明客户端希望升级到WebSocket协议。
-
服务器响应:如果服务器支持WebSocket,它会响应一个HTTP响应,确认协议升级。
-
连接建立:一旦握手完成,客户端和服务器之间的连接就转变为WebSocket连接,双方可以开始发送数据。
WebSocket 使用ws(非加密)或wss(加密)作为URL协议前缀,例如:
-
ws://localhost
/api
/ws表示非加密的WebSocket连接。 -
wss://localhost
/
api/wss 表示加密的WebSocket连接。
2. Spring + React 实现ws通信
后端:SpringBoot
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置WebSocket:创建一个配置类来启用WebSocket并定义消息代理:
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
这段代码是Spring Framework中用于配置WebSocket消息代理的Java配置类。它使用了Spring的WebSocketMessageBrokerConfigurer
接口来定义WebSocket通信的端点和消息代理的行为。以下是对这段代码的详细解释:
-
@Configuration
注解: 这个注解表明这是一个Spring配置类,Spring容器将会为这个类创建一个bean,并将其加入到应用上下文中。 -
@EnableWebSocketMessageBroker
注解: 这个注解启用了WebSocket消息代理,允许使用高级消息传递功能,例如消息的订阅和发布。它还启用了STOMP协议的支持。 -
implements WebSocketMessageBrokerConfigurer
: 这个类实现了WebSocketMessageBrokerConfigurer
接口,该接口允许配置WebSocket消息代理的细节。 -
registerStompEndpoints(StompEndpointRegistry registry)
方法: 这个方法用于注册STOMP协议的端点。在这个配置中,它添加了一个端点/ws
,并且指定使用SockJS协议。SockJS是一个浏览器JavaScript库,它提供了一个透明的、跨域的WebSocket兼容接口。-
registry.addEndpoint("/ws")
:注册一个新的WebSocket端点/ws
。 -
.withSockJS()
:指定这个端点使用SockJS协议。
-
-
configureMessageBroker(MessageBrokerRegistry registry)
方法: 这个方法用于配置消息代理的行为。-
registry.enableSimpleBroker("/topic")
:启用一个简单的消息代理,它将消息路由到以/topic
为前缀的目的地。这意味着所有发送到/topic
前缀的目的地的消息都将被广播给所有订阅了这些目的地的客户端。 -
registry.setApplicationDestinationPrefixes("/app")
:设置应用程序目的地前缀为/app
。这意味着所有以/app
为前缀的目的地都将被用作应用程序的端点,例如/app/sendMessage
。
-
通过这个配置,Spring Boot应用将能够处理WebSocket连接,并且可以使用STOMP协议发送和接收消息。客户端可以通过/ws
端点连接到服务器,并且可以订阅以/topic
为前缀的目的地来接收消息,或者发送消息到以/app
为前缀的目的地。这个配置为构建基于WebSocket的实时通信应用提供了基础。
触发器:controller
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class WebSocketController {
@MessageMapping("/sendMessage")
@SendTo("/topic/messages")
public String processMessageFromClient(String message) {
return "Server response: " + message;
}
}
这段代码是一个Spring框架中的WebSocket控制器,使用了STOMP协议来处理WebSocket消息。下面是对这段代码的详细解释:
-
@Controller
注解: 这个注解表明该类是一个WebSocket控制器,Spring将使用它来处理WebSocket相关的消息。 -
@MessageMapping("/sendMessage")
注解: 这个注解定义了一个消息映射,它告诉Spring当接收到发送到/app/sendMessage
(因为还有一个setApplicationDestinationPrefixes("/app")
的配置)的WebSocket消息时,应该调用processMessageFromClient
方法来处理这个消息。 -
@SendTo("/topic/messages")
注解: 这个注解指定了处理方法返回的消息应该被发送到哪个目的地。在这个例子中,processMessageFromClient
方法的返回值将被发送到/topic/messages
,这意味着所有订阅了/topic/messages
的客户端都将收到这个消息。 -
processMessageFromClient
方法: 这个方法是实际处理客户端消息的方法。它接收一个字符串参数message
,这个字符串是从客户端接收到的消息内容。方法返回一个字符串,这个字符串是服务器对客户端消息的响应。-
方法参数:
String message
是从客户端接收到的消息。 -
方法返回值:
String
是服务器对客户端消息的响应,这里简单地在客户端消息前加上了前缀"Server response: "
。
-
当客户端通过WebSocket连接并发送消息到/app/sendMessage
时,Spring将自动调用processMessageFromClient
方法,并将客户端发送的消息作为参数传递给这个方法。方法处理完消息后,返回的响应将被发送到所有订阅了/topic/messages
的客户端。
触发器:端点
使用@ServerEndpoint
注解定义WebSocket端点:
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ServerEndpoint("/ws")
public class WebSocketEndpoint {
private static final Set<WebSocketEndpoint> endpoints = new CopyOnWriteArraySet<>();
public WebSocketEndpoint() {
endpoints.add(this);
}
@OnOpen
public void onOpen(Session session) {
System.out.println("New connection opened: " + session.getId());
}
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("Received message from " + session.getId() + ": " + message);
for (WebSocketEndpoint endpoint : endpoints) {
try {
endpoint.sendMessage(session.getId(), message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnClose
public void onClose(Session session) {
System.out.println("Connection closed: " + session.getId());
endpoints.remove(this);
}
@OnError
public void onError(Session session, Throwable throwable) {
System.out.println("Error in connection: " + session.getId());
throwable.printStackTrace();
endpoints.remove(this);
}
public void sendMessage(String sessionId, String message) throws IOException {
Session session = sessions.get(sessionId);
if (session != null) {
session.getBasicRemote().sendText(message);
}
}
}
使用@ServerEndpoint
注解来定义WebSocket端点,而不是通过控制器层的处理器。这样,当客户端连接到/ws
端点时,会自动触发WebSocketEndpoint
实例的onOpen
方法,从而建立WebSocket连接。
-
@ServerEndpoint("/ws")
: 这是一个注解,用于定义WebSocket的端点。在这个例子中,客户端可以通过访问/ws
路径来建立WebSocket连接。 -
private static final Set<WebSocketEndpoint> endpoints
: 这是一个静态的Set
集合,用于存储所有活动的WebSocketEndpoint
实例。它使用CopyOnWriteArraySet
,这是一种线程安全的变体,适用于读多写少的场景。 -
public WebSocketEndpoint()
: 这是WebSocketEndpoint
类的构造函数。在创建新的实例时,它会将自己添加到endpoints
集合中。 -
@OnOpen public void onOpen(Session session)
: 这个方法在新的WebSocket会话打开时被调用。Session
对象代表与客户端的连接,可以用于发送消息和关闭连接。在这个方法中,通常可以执行一些初始化操作,比如打印日志或将用户信息与会话关联。 -
@OnMessage public void onMessage(String message, Session session)
: 这个方法在服务器接收到客户端发送的消息时被调用。它接收消息内容和与之关联的Session
对象。在这个例子中,服务器接收到消息后,会将消息打印到控制台,并将相同的消息广播给所有连接的客户端。 -
@OnClose public void onClose(Session session)
: 这个方法在WebSocket会话关闭时被调用。在这里,可以执行一些清理操作,比如从endpoints
集合中移除当前的WebSocketEndpoint
实例。 -
@OnError public void onError(Session session, Throwable throwable)
: 这个方法在WebSocket会话发生错误时被调用。它接收与错误关联的Session
对象和一个Throwable
对象,后者包含了错误的详细信息。在这里,可以处理错误,比如打印堆栈跟踪或从endpoints
集合中移除当前的WebSocketEndpoint
实例。 -
public void sendMessage(String sessionId, String message) throws IOException
: 这个方法用于向特定的会话发送消息。它接收会话ID和要发送的消息内容。方法内部,它查找与给定会话ID关联的Session
对象,并使用BasicRemote
对象发送消息。如果会话不存在或发生IO异常,可能会抛出IOException
。
前端:React
安装SockJS和STOMP客户端:使用npm安装SockJS和STOMP客户端库:
npm install sockjs-client stompjs
创建WebSocket逻辑:在React组件中,使用SockJS和STOMP客户端连接到后端WebSocket服务器,并发送/接收消息。
import React, { useEffect, useState } from 'react';
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';
function App() {
const [messages, setMessages] = useState([]);
const [inputValue, setInputValue] = useState('');
useEffect(() => {
const socket = new SockJS('http://localhost:8080/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, frame => {
stompClient.subscribe('/topic/messages', message => {
const messageBody = JSON.parse(message.body);
setMessages(prevMessages => [...prevMessages, messageBody]);
});
});
const sendMessage = () => {
stompClient.send("/app/sendMessage", {}, inputValue);
setInputValue(''); // Clear input field after sending
};
return () => {
stompClient.disconnect();
};
}, [inputValue]);
return (
<div>
<h1>WebSocket Messages</h1>
<ul>
{messages.map((message, index) => (
<li key={index}>{message}</li>
))}
</ul>
<input
value={inputValue}
onChange={e => setInputValue(e.target.value)}
placeholder="Type a message"
/>
<button onClick={sendMessage}>Send</button>
</div>
);
}
export default App;
-
这个钩子用于处理组件挂载和卸载时的逻辑。
-
当组件挂载时(
useEffect
无依赖或依赖项变化时),它创建一个SockJS
连接和一个Stomp
客户端。 -
stompClient.connect
用于建立与服务器的STOMP连接。连接成功后,它订阅/topic/messages
主题,以便接收服务器发送的消息。 -
setMessages
用于更新状态,将新接收的消息添加到messages
数组中。 -
sendMessage
函数用于发送消息到服务器。它将inputValue
发送到/app/sendMessage
目的地,并在发送后清空输入框。
3. 实际业务
一批设备通过mqtt传输消息,前端页面需要显示设备实时信息,如天气、风力、速度等。业务逻辑可以为,后端代码接收到设备通过mqtt传输的消息,将其一定量转化,然后通过ws协议将数据推送,最后前端拿到数据显示。
1. 定义端点
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageConfiguration implements WebSocketMessageBrokerConfigurer {
@Autowired
private AuthPrincipalHandler authPrincipalHandler;
@Autowired
private WebSocketDefaultFactory webSocketDefaultFactory;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// Set the WebSocket connection address
registry.addEndpoint("/api/v1/ws").setAllowedOriginPatterns("*")
.setHandshakeHandler(authPrincipalHandler);
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.addDecoratorFactory(webSocketDefaultFactory);
registry.setTimeToFirstMessage(60000 * 60 * 24 * 10);
}
}
-
implements WebSocketMessageBrokerConfigurer
:这个类实现了WebSocketMessageBrokerConfigurer
接口,该接口允许配置WebSocket消息代理的细节。 -
AuthPrincipalHandler authPrincipalHandler
:这是一个自定义的握手处理器,用于在WebSocket连接建立之前进行一些认证操作。 -
WebSocketDefaultFactory webSocketDefaultFactory
:这是一个自定义的装饰器工厂,用于添加一些默认的配置或处理逻辑。 -
registerStompEndpoints(StompEndpointRegistry registry)
:这个方法用于注册STOMP协议的端点。在这个例子中,端点被设置为/api/v1/ws
,并且允许来自任何源的连接(setAllowedOriginPatterns("*")
)。同时,设置了自定义的握手处理器authPrincipalHandler
。 -
configureWebSocketTransport(WebSocketTransportRegistration registry)
:这个方法用于配置WebSocket传输层的一些设置。在这个例子中,它添加了一个自定义的装饰器工厂webSocketDefaultFactory
,并且设置了首次消息时间(timeToFirstMessage
)为10天。
2. 消息服务
@Service
@Slf4j
public class SendMessageServiceImpl implements ISendMessageService {
@Autowired
private ObjectMapper mapper;
@Autowired
private IWebSocketManageService webSocketManageService;
@Override
public void sendMessage(ConcurrentWebSocketSession session, CustomWebSocketMessage message) {
if (session == null) {
return;
}
try {
if (!session.isOpen()) {
session.close();
log.debug("This session is closed.");
return;
}
session.sendMessage(new TextMessage(mapper.writeValueAsBytes(message)));
} catch (IOException e) {
log.info("Failed to publish the message. {}", message.toString());
e.printStackTrace();
}
}
@Override
public void sendBatch(Collection<ConcurrentWebSocketSession> sessions, CustomWebSocketMessage message) {
if (sessions.isEmpty()) {
return;
}
try {
TextMessage data = new TextMessage(mapper.writeValueAsBytes(message));
for (ConcurrentWebSocketSession session : sessions) {
if (!session.isOpen()) {
session.close();
log.debug("This session is closed.");
return;
}
session.sendMessage(data);
}
} catch (IOException e) {
log.info("Failed to publish the message. {}", message.toString());
e.printStackTrace();
}
}
@Override
public void sendBatch(String workspaceId, Integer userType, String bizCode, Object data) {
if (!StringUtils.hasText(workspaceId)) {
throw new RuntimeException("Workspace ID does not exist.");
}
Collection<ConcurrentWebSocketSession> sessions = Objects.isNull(userType) ?
webSocketManageService.getValueWithWorkspace(workspaceId) :
webSocketManageService.getValueWithWorkspaceAndUserType(workspaceId, userType);
this.sendBatch(sessions, CustomWebSocketMessage.builder()
.data(data)
.timestamp(System.currentTimeMillis())
.bizCode(bizCode)
.build());
}
@Override
public void sendBatch(String workspaceId, String bizCode, Object data) {
this.sendBatch(workspaceId, null, bizCode, data);
}
}
底层消息发送是通过ConcurrentWebSocketSessionDecorator
类中的sendMessage方法
public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator
{
public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (!this.shouldNotSend()) {
this.buffer.add(message);
this.bufferSize.addAndGet(message.getPayloadLength());
if (this.preSendCallback != null) {
this.preSendCallback.accept(message);
}
do {
if (!this.tryFlushMessageBuffer()) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("Another send already in progress: session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes", this.getId(), this.getTimeSinceSendStarted(), this.getBufferSize()));
}
this.checkSessionLimits();
break;
}
} while(!this.buffer.isEmpty() && !this.shouldNotSend());
}
}
}
3. 使用
@Autowired
private SendMessageServiceImpl sendMessageService;
@Override
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_HMS)
public void handleHms(CommonTopicReceiver receiver, MessageHeaders headers) {
List<DeviceHmsDTO> unReadList = new ArrayList<>();
objectMapper.convertValue(((Map) (receiver.getData())).get(MapKeyConst.LIST),
new TypeReference<List<DeviceHmsReceiver>>() {
})
.forEach(hmsReceiver -> {
final DeviceHmsEntity hms = entity.clone();
this.fillEntity(hms, hmsReceiver);
// The same unread hms are no longer incremented.
if (hmsMap.contains(hms.getHmsKey())) {
return;
}
this.fillMessage(hms, hmsReceiver.getArgs());
DockDevice dagDevice = this.dagBridge.getDockDevice(sn);
if (dagDevice != null&&hms.getLevel()==2) {
dagDevice.drone.AlertInfoReport(hms.getMessageZh());
}
unReadList.add(entity2Dto(hms));
mapper.insert(hms);
});
sendMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
BizCodeEnum.DEVICE_HMS.getCode(),
TelemetryDTO.<List<DeviceHmsDTO>>builder().sn(sn).host(unReadList).build());
}
在这个消息通道中有消息,接收消息转换并将其发送到http://localhost:1234/api/ws。前端代码订阅该主题即可获取响应消息。
注:有没有发现这个很像发布/订阅模型?