消息队列MQ(RabbitMQ)
在现代软件架构中,消息队列(Message Queue,简称MQ)是一种非常重要的中间件,广泛应用于分布式系统、微服务架构以及异步通信场景中。消息队列通过允许应用程序之间通过消息进行通信,从而实现解耦、提高系统的可扩展性和可靠性。本文将介绍消息队列的基本概念、常见实现方式,并通过一个简单的Java示例来展示如何使用消息队列。
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.
目比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
RabbitMQ: One broker to queue them all | RabbitMQhttps://www.rabbitmq.com/ 安装过后,有几个专有名称需要知道
-
publisher
:生产者,也就是发送消息的一方 -
consumer
:消费者,也就是消费消息的一方 -
queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理 -
exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。 -
virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
消息发送
首先配置MQ地址,在publisher
服务的application.yml
中添加配置:
spring:
rabbitmq:
host: # 你的虚拟机IP
port: # 端口
virtual-host: # 虚拟主机
username: # 用户名
password: # 密码
然后在publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送:
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收
首先配置MQ地址,在consumer
服务的application.yml
中添加配置:
spring:
rabbitmq:
host: # 你的虚拟机IP
port: # 端口
virtual-host: # 虚拟主机
username: # 用户名
password: # 密码
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}