消息队列篇--通信协议篇--STOMP(STOMP特点、格式及示例,WebSocket上使用STOMP,消息队列上使用STOMP等)
STOMP(Simple Text Oriented Messaging Protocol,简单面向文本的消息传递协议)是一种轻量级、基于文本的协议,旨在为消息代理(消息队列)和客户端之间的通信(websocket)提供一种简单的接口。它通常运行在TCP或WebSocket之上,并广泛用于实现发布/订阅、点对点消息传递等模式。
STOMP提供了一种简单的机制来发送和接收消息,适用于各种消息中间件系统,如ActiveMQ、RabbitMQ和Apache Kafka等。
特点:
- 基于文本的协议,简单易用,适合快速开发。
- 支持多种消息传递模式(如发布/订阅、点对点)。
- 支持事务和消息确认机制。
- 轻量级,适用于资源受限的环境(宽带低等)。
- 支持多种消息代理(如RabbitMQ、ActiveMQ)。
- 支持客户端之间的通信(如:websocket)
适用场景:
- 实时通信应用(如聊天系统、社交网络)。
- 微服务之间的异步通信。
- IoT设备之间的轻量级通信。
1、STOMP基本概念
(1)、目的地(Destination)
目的地是消息发送或接收的目标地址。
常见的目的地类型包括:
- 队列(Queue):点对点消息传递模式,每条消息只会被一个消费者处理。
- 主题(Topic):发布/订阅模式,每条消息会被所有订阅者处理。
(2)、命令(Commands)
STOMP定义了若干命令,用于控制消息的发送、接收和管理。
常见的命令包括:
- CONNECT:建立连接。
- SEND:发送消息到指定的目的地。
- SUBSCRIBE:订阅某个目的地,接收该目的地的消息。
- UNSUBSCRIBE:取消订阅某个目的地。
- ACK:确认消息已被成功处理。
- NACK:拒绝或无法处理消息。
- DISCONNECT:断开连接。
(3)、头信息(Headers)
头信息是伴随每个命令的键值对,用于传递额外的元数据。
常见的头信息包括:
- destination:消息的目的地。
- id:订阅的唯一标识符。
- receipt:请求服务器返回一个收据,确认命令已执行。
(4)、消息体(Body)
消息体包含实际的消息内容,可以是任意格式的数据(如JSON、XML、纯文本等)。消息体必须以空字节\u0000结束。
2、STOMP消息格式
每条STOMP消息由命令行、头信息和消息体组成,各部分之间用换行符\n分隔,整个消息以两个连续的换行符\n\n结束。
示例:CONNECT命令
CONNECT
accept-version:1.2
host:stomp.example.com
^@
解释:
- CONNECT:命令名称。
- accept-version:1.2:表示支持的STOMP版本。
- host:stomp.example.com:目标主机。
- ^@:表示消息体为空,用一个空字节(ASCII码为0)来表示。
示例:SEND命令
SEND
destination:/queue/work
Hello, STOMP!
^@
解释:
- SEND:命令名称。
- destination:/queue/work:消息的目的地。
- Hello, STOMP!:消息的内容。
- ^@:表示消息结束。
示例:SUBSCRIBE命令
SUBSCRIBE
id:sub-001
destination:/topic/greetings
^@
解释:
- SUBSCRIBE:命令名称。
- id:sub-001:订阅的唯一标识符。
- destination:/topic/greetings:要订阅的主题地址。
- ^@:表示消息体为空。
3、STOMP工作流程
原理示意图:
(1)、连接
客户端首先需要通过CONNECT命令与STOMP服务器建立连接。如果连接成功,服务器会返回一个CONNECTED响应。
客户端示例:
CONNECT
accept-version:1.2
host:stomp.example.com
^@
服务器响应示例:
CONNECTED
version:1.2
session:session-id-12345
^@
(2)、订阅
客户端可以通过SUBSCRIBE命令订阅某个目的地,接收该目的地的消息。
客户端示例:
SUBSCRIBE
id:sub-001
destination:/topic/greetings
^@
(3)、发送消息
客户端可以通过SEND命令向某个目的地发送消息。
客户端示例:
SEND
destination:/queue/work
Hello, STOMP!
^@
(4)、接收消息
当有消息到达客户端订阅的目的地时,服务器会将消息推送到客户端。
消息示例:
MESSAGE
subscription:sub-001
message-id:message-id-67890
destination:/topic/greetings
Hello, World!
^@
(5)、断开连接
客户端可以通过DISCONNECT命令断开与服务器的连接。
客户端示例:
DISCONNECT
^@
4、在WebSocket上使用STOMP
在WebSocket之上使用STOMP时,STOMP消息作为WebSocket数据帧的有效载荷进行传输。这种方式结合了WebSocket的全双工通信能力和STOMP的结构化消息传递功能。
示例:JavaScript中使用WebSocket和STOMP
const socket = new WebSocket('ws://example.com/stomp-endpoint'); // 建立websocket连接
socket.onopen = function() {
// 发送CONNECT命令
const connectMessage = CONNECT\naccept-version:1.2\nhost:example.com\n\n\u0000;
socket.send(connectMessage);
// 发送SUBSCRIBE命令
const subscribeMessage = SUBSCRIBE\nid:sub-001\ndestination:/topic/greetings\n\n\u0000;
socket.send(subscribeMessage);
};
socket.onmessage = function(event) {
console.log('Received:', event.data);
// 解析收到的STOMP消息
if (event.data.startsWith('MESSAGE')) {
console.log('New message:', event.data.split('\n\n')[1].trim());
}
};
5、代码示例
依赖库:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-messaging</artifactId>
</dependency>
发送消息:
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
public class StompProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建 WebSocket 客户端
StandardWebSocketClient wsClient = new StandardWebSocketClient();
SockJsClient sockJsClient = new SockJsClient(Collections.singletonList(new WebSocketTransport(wsClient)));
// 创建STOMP客户端
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
// 连接到STOMP代理
StompSession session = stompClient.connect("ws://localhost:8080/ws", new StompSessionHandlerAdapter() {}).get();
// 发送消息
StompHeaders headers = new StompHeaders();
headers.setDestination("/app/hello");
session.send(headers, "Hello, STOMP!");
System.out.println("Message sent");
}
}
接收消息:
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
public class StompConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建WebSocket客户端
StandardWebSocketClient wsClient = new StandardWebSocketClient();
SockJsClient sockJsClient = new SockJsClient(Collections.singletonList(new WebSocketTransport(wsClient)));
// 创建STOMP客户端
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
// 连接到STOMP代理
StompSession session = stompClient.connect("ws://localhost:8080/ws", new StompSessionHandlerAdapter() {}).get();
// 订阅主题并设置回调
session.subscribe("/topic/greetings", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("Received message: " + payload);
}
});
// 保持连接
Thread.sleep(Long.MAX_VALUE);
}
}
6、总结
STOMP是一种简单而强大的消息传递协议,特别适合于需要灵活消息路由的应用场景。通过运行在WebSocket或其他传输协议之上。
STOMP提供了以下优势:
- 易用性:基于文本的协议,易于实现和调试。
- 灵活性:支持多种消息传递模式,适应不同的应用场景。
- 跨平台:可以在多种编程语言和平台上使用,具有良好的互操作性。
通过理解STOMP的基本概念、命令和工作流程,开发者可以有效地利用这一协议构建高效的消息传递系统。
乘风破浪!Dare to Be!!!