RabbitMQ消息中间件
一、初始MQ
首先了解一下微服务间通讯有同步和异步两种方式:
- 同步通讯:是指两个或多个系统在进行信息交换时,必须在同一时刻进行操作
- 异步通讯:是指两个或多个系统之间的通讯方式,其中发送方和接收方不是在同一时刻进行操作。
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的缺点:
- 多个系统间耦合,扩展及后续维护繁琐
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败风险
异步通讯:
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量削峰
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理(对程序员的技术要求高了)
- 需要依赖于Broker的可靠、安全、性能(搭建集群)
1. 技术对比
MQ,中文是消息队列(Message Queue),字面来看就是存放消息的队列。
比较常见的MQ实现(也被称为消息中间件):
- ActiveMQ
- **RabbitMQ**
- **RocketMQ**
- Kafka
几种常见MQ的对比:
对比 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP SMTP,STOMP | OpenWire,STOMP REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
选择原则:
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
2. 原生JavaAPI实现MQ
在这之前,先认识RabbitMQ中的一些角色:
- publisher:生产者(使用Java代码发送消息)
- consumer:消费者(使用Java代码接收消息)
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息
2.1 MQ的消息模型
- 简单队列
- 工作队列模式
- 发布订阅模式
- Fanout广播
- Direct定向模式
- Topic主题
- 消息转换器
下面使用原生API只展示简单队列模式
2.2 原生JavaAPI实现简单队列
简单队列模式的模型图:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列中的消息
下面使用的是官方提供的原生JavaAPI完成的;不用自己手敲代码练习,下面有利用Spring简化开发的方案
//生产端publisher实现
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.200.130");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("用户名");//设置自己的用户名和密码
factory.setPassword("*****");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
/***********************************************************************************************/
//消费端consumer实现
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.200.130");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("用户"); //用户密码和上面的生产端保持一致
factory.setPassword("*****");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
小结:
基本消息队列的消息发送流程:
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 利用channel向队列发送消息
基本消息队列的消息接收流程:
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 定义consumer的消费行为handleDelivery()
5. 利用channel将消费者与队列绑定
3.基于SpringAMQP实现MQ
SpringAMQP是基于RabbitMQ封装的一套模板,并且利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系(代码+注解)
- 封装了RabbitTemplate工具,用于发送消息 :rabbitTemplate.convertAndSend()
- 基于注解的监听器模式,异步接收消息:@RabbitListener
在父工程中引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.简单队列
消息发送:
首先配置MQ地址,在publisher服务的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.200.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: 用户名 # 自己的用户名(不能为中文)和密码
password: *****
在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送。
代码实现如下:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTempslate;
@Test //不要导错包,用比较长的import org.junit.jupiter.api.Test;
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息:此处并不会自动创建队列
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收:
首先配置MQ地址,在consumer服务的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.200.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
在consumer服务的中新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
2.工作队列(Work queues)
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,
消息就会堆积越来越多,无法及时处理。如何解决呢?
- 那我们可以让多个消费者绑定到一个队列,共同消费队列中的消息。
这个就称为Work queues,也被称为(Task queues),任务模型。可以使用work 模型,
多个消费者共同处理消息处理,速度就能大大提高了。
消息发送:
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 1; i <= 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
消息接收:
要模拟多个消费者绑定同一个队列,我们在consumer中添加2个新的方法:
//@RabbitListener(queues = "simple.queue")
//public void listenSimpleQueueMessage(String msg) {
// System.out.println(msg);
//}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println(LocalTime.now() + "消费者1:" + msg);
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println(LocalTime.now() + "消费者2:" + msg);
Thread.sleep(200);
}
运算之后得到结果:消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
怎样解决这个问题呢?
我们可以修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
host: 192.168.200.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: 用户名 # 自己的用户名和密码
password: ****
listener: #监听
simple: #简单消息模型
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
3.发布/订阅
发布订阅的模型如图:
在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给exchage(交换机)
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
- Exchange:交换机(消息路由)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,
- 例如递交给某个特别队列、递交给所有队列、或将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有以下3种类型:
- Fanout(扇出):广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,
或者没有符合路由规则的队列,那么消息会丢失!
4.Fanout广播
Fanout,英文翻译是扇出,在MQ中理解成广播更合适。
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
声明队列和交换机
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:UML类图
在consumer服务中创建一个类,声明队列和交换机:
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean // 方法中的参数,从IoC容器中获取
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Binding
5.Direct定向
在Fanout模式中,一条消息,会被所有订阅的队列都消费。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)。
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,
只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息。
案例需求如下:
1. 利用@RabbitListener声明Exchange、Queue、RoutingKey
2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3. 在publisher中编写测试方法,向itcast. direct发送消息
声明队列和交换机
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"), //创建队列
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),//创建交换机
key = {"red", "blue"} //绑定接受消息的key
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息:key=red,两个消费者都能收到消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
// 发送消息:key=blue,消费者1 能收到消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
// 发送消息:key=yellow,消费者2 能收到消息
rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
}
总结:
Direct交换机与Fanout交换机的差异
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机的常见注解
- 开始声明:bindings= ?
- 指定一个绑定关系: @QueueBinding
- 声明队列:value = @Queue
- 声明交换机:exchange = @Exchange
- 指定路由key:key = {一个或多个}
6.Topic主题
`Topic`类型的`Exchange`与`Direct`相比:
- 相同点:都可以根据`RoutingKey`把消息路由到不同的队列
- 不同点:`Topic`类型`Exchange`可以让队列在绑定`Routing key` 的时候使用通配符!
> `Routingkey` 一般都是有一个或多个单词组成,多个单词之间以”.”分割,
例如: `item.insert`, item.del
> 通配符规则:
>
> `#`:匹配零个,一个或多个词,任意多个【常用】
>
> `*`:匹配不多不少必须是1个词
举例:
demo.#:能够匹配demo, demo.spu, demo.spu.insert
demo.*:只能匹配demo.spu
实现思路如下:
1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey
2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3. 在publisher中编写测试方法,向itcast. topic发送消息
- Queue1:假设绑定的是`china.#` ,因此凡是以 `china.`开头的`routing key` 都会被匹配到。
- 包括china.news和china.weather
- Queue2:假设绑定的是`#.news` ,因此凡是以 `.news`结尾的 `routing key` 都会被匹配。
- 包括china.news和japan.news
消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "demo.queue1"),
exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "demo.queue2"),
exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到demo.queue2的消息:【" + msg + "】");
}
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "demo.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.weather", "明天天气晴,20-36度");
}
# 总结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以 `**.**` 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
- `#`:代表0个,1个或多个词
- `*`:代表1个词
7.消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为下面的message对象。
void convertAndSend(String exchange, String routingKey,Object message) throw AmqpException;
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 可读性差
测试默认转换器:
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("simple.queue", msg);
}
1、执行前:先停止consumer服务,防止消息被消费掉,无法在RabbitMQ控制台看到
2、MQ服务上没有simple.queue(临时),通过管理端快速创建一个
发送消息后查看控制台:
# 配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化
和反序列化。
1、在publisher和consumer两个服务中都引入依赖,因此咱们选择在父工程添加:
<!-- mq-demo的pom.xml,jackson(SpringBoot用的) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2、配置消息转换器
#在PublisherApplication和ConsumerApplication两个启动类中都添加一个Bean
PublisherApplication,作用:Java对象 =====》JSON字符串
import org.springframework.amqp.support.converter.MessageConverter;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean //注意导包:org.springframework.amqp.support.converter.MessageConverter
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
3、重新发送消息:通过管理平台查询效果
4、接受消息:SpringRabbitListener
@RabbitListener(queues = "simple.queue")
public void listenObjectQueue(Map<String,Object> msg){
System.out.println("接收到object.queue的消息:" + msg);
}
二、MQ高级
1.消息可靠性
消息从发送到消费者接收,会经历多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制(发送时丢失)
- 消息持久化(MQ宕机)
- 消费者确认机制(消费者宕机)
- 失败重试机制(消费失败)
1.1.生产者消息确认
RabbitMQ提供了生产者确认机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。
消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回通知,及路由失败原因。
- 正常到达队列,没有任何回复(没有回复就是成功)
确认机制发送消息时,需要给每个消息设置一个全局唯一Id,以区分不同消息,避免ack冲突。
举个栗子:
修改publisher服务中的application.yml文件,添加下面的内容:
spring:
rabbitmq:
host: 192.168.200.130 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast
password: 123321
virtual-host: /
publisher-confirm-type: correlated #判断是否到达交换机(异步通知)
publisher-returns: true #判断是否到达队列
template:
mandatory: true #定义消息路由失败时的策略
解释说明一下:
- `publish-confirm-type`:开启publisher-confirm,这里支持两种类型:
- `simple`:同步等待confirm结果,直到超时【一般不使用,影响性能】
- `correlated`:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
- `publish-returns`:
- `true`:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
- `false`:关闭publish-return功能
- `template.mandatory`:定义消息路由失败时的策略。
- true则调用ReturnCallback
- false则直接丢弃消息
修改consumer服务中的application.yml:改为自己的虚拟机IP
spring:
rabbitmq:
host: 192.168.200.130 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast
password: 123321
virtual-host: /
定义Return回调:
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
作用: 如果消息没有到达队列,会执行回调方法
修改publisher服务,添加一个ReturnCallback:
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
//ApplicationContextAware: 在Spring容器(Bean工厂)创建好的时候,通知咱们
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback,先用匿名内部类
rabbitTemplate.setReturnCallback(
(message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败(没有到达队列),记录日志
log.error("消息队列接收失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
//rabbitTemplate.convertAndSend(exchange, routingKey, message);
});
}
}
定义Confirm回调:
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
//uuid, 雪花算法
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送到交换机成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送到交换机失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息,其中"simple.test"是路由key
rabbitTemplate.convertAndSend(
"amq.topic", "simple.test", message, correlationData);
// 休眠一会儿,等待ack回执
//如果不休眠,程序就直接结束了;RabbitMQ服务器就无法回调咱们写的代码
Thread.sleep(2000);
}
登录到MQ的管理端:
# 测试:
1、发送到一个不存在的交换机:camq.topic
rabbitTemplate.convertAndSend(
"camq.topic", "simple.test", message, correlationData);
//查看日志:会有一个没有到达交换机的信息
2、发送到一个已经存在的交换机:amq.topic(系统自带的),但没有绑定指定的路由
rabbitTemplate.convertAndSend(
"amq.topic", "simple.test", message, correlationData);
//查看日志:没有路由到队列
3、通过管理端指定amq.topic交换机的路由key到simple.queue
rabbitTemplate.convertAndSend(
"amq.topic", "simple.test", message, correlationData);
//成功发送,需要到管理端查看一下队列中是否有消息
1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,
也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
# 交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
//durable: 持久化
return new DirectExchange("simple.direct", true, false);
//默认创建就是持久化的交换机
//return new DirectExchange("simple.direct");
}
提示:由SpringAMQP声明的交换机都是持久化的
可以在RabbitMQ控制台看到持久化的交换机都会带上`D`的标示:
# 队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
//return new Queue("simple.queue");
}
提示:由SpringAMQP声明的交换机都是持久化的
# 消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
- 非持久化:MessageDeliveryMode.NON_PERSISTENT
- 持久化:MessageDeliveryMode.PERSISTENT
用java代码指定:
@Test
public void testDurableMessage() {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //设置消息的属性:持久化
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
提示:由SpringAMQP声明的交换机都是持久化的
1.3.消费者消息确认
RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回
执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
/********************************************************************************
而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在处理完消息后,调用api发送ack【麻烦,一般不使用】。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后肯定会成功处理,因此消息投递后立即被删除
由此可知:
- manual:自己根据业务情况,判断什么时候该ack(太麻烦,不使用)
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- none模式下,消息投递是不可靠的,可能丢失(不适合用在项目中)
因此,我们都是使用默认的auto即可。
# none模式
修改consumer服务的application.yml文件,添加下面内容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消费者接收到simple.queue的消息:【{}】", msg);
// 模拟异常 ->给MQ返回nack
System.out.println(1 / 0);
log.debug("消息成功处理完成!");
}
// 测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了
# auto模式
# 再次把确认机制修改为auto:
spring:
rabbitmq:
listener:
simple:
#消费成功,返回ack
#消费失败,返回nack
acknowledge-mode: auto # 根据异常自动ack
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
1.4.消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,
无限循环,导致mq的消息处理飙升,带来不必要的压力:
怎么办呢?
# 本地重试
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry: #本地重试
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数,包含服务器推送的第一次
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
- 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了(RejectAndDontRequeue),说明最后SpringAMQP返回的是ack,
mq删除消息了
reject: 拒绝
don't re queue: 不要重新放到队列
# 结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack给MQ服务器(reject+ not re queue),消息会被丢弃
失败策略:
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,会有MessageRecoverer接口来处理,
它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:本地重试耗尽后,返回nack,消息重新入队,重新推送消息
- RepublishMessageRecoverer【最优方法】:重试耗尽后,将失败消息投递到指定的交换机(后续人工介入来处理)
处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,
后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//TODO 指定失败处理策略
}
/**************************************************************************************************/
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean //非常特殊,方法上有@Bean,方法中所有的参数自动就有一个@Autowired
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
//最终效果:将重试失败的消息重新发送到指定的交换机+路由key
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
/*********************************************************************************************************/
完整代码:
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean //修改本地重试耗尽之后,消息处理策略:把消息发到指定的交换+key
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
1.5.总结
如何确保RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达交换机和队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败本地重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,
- 交由人工处理
2.死信交换机
2.1.认识死信
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费【利用此机制,实现延迟消息】
- 要投递的队列消息满了,无法投递
如果一个消息被消费者拒绝了,变成了死信:
如果这个包含死信的队列配置了`dead-letter-exchange`属性,指定了一个交换机,那么队列中的死信就会投递到
这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个只存放死信的队列:
因为simple.queue绑定了死信交换机 dl.direct,并且设置了路由key,因此死信最终会经过死信交换机路由给死信队列。
- 指定死信交换机名称:dl.direct
- 指定死信交换机与死信队列绑定的RoutingKey:dl
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
> 下边代码只是为了演示,对应图片中的配置,不用添加到项目中
@Bean
public Queue simpleQueue(){
//return new Queue("simple.queue");
return QueueBuilder
.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") //指定路由key
.build();
}
# 总结:
什么样的消息会成为死信?
- 消息被消费者reject或者返回nack,并且设置了requeue=false
- 消息超时未消费
- 队列满了
死信交换机的使用场景是什么?
- 如果队列绑定了死信交换机,死信会投递到死信交换机;
- 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
2.2.TTL(过期时间)
TTL,也就是Time-To-Live(过期时间)。如果一个队列中的消息TTL结束仍未消费,则会变为死信。
TTL超时分为两种情况:
- 消息本身设置了超时时间
- 消息所在的队列设置了超时时间
> 思考:为什么要给消息或者队列设置过期时间呢?
> 目的:实现延迟任务的功能
> 比如要实现如下功能:
>
> - 延迟10分钟发送短信给用户(ttl = 10分钟)
> - 用户下单,如果用户在15 分钟内未支付,则自动取消
> - 预约工作会议,20分钟后自动通知所有参会人员
# 创建死信交换机
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明死信交换机 dl.direct、
死信队列 dl.queue:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.queue的延迟消息:{}", msg);
}
# 声明队列,指定超时时间
在consumer服务中新建TTLMessageConfig,创建ttl队列:
- 设置超时时间:ttl(10000)
- 指定死信交换机:deadLetterExchange("dl.direct")
- 指定死信的路由key:deadLetterRoutingKey("dl")
package cn.itcast.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLMessageConfig {
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl")
.build();
}
/**
* 声明交换机,将ttl队列与交换机绑定
*/
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){
return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("ttl");
}
}
在publisher服务中发送消息:
@Test
public void testTTLQueue() {
// 创建消息
String message = "hello, ttl queue";
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 记录日志
log.debug("发送消息成功");
}
注意:先启动消费者,再发送消息
执行完之后观察时间戳,可以看到消息发送与接收之间的时差大概是10秒。
# 发送消息时,设定TTL
在发送消息时,也可以指定TTL:
@Test
public void testTTLMsg() {
// 创建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
//setex : set expire
.setExpiration("5000") //设置过期时间
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("发送消息成功");
}
查看发送消息日志:
接收消息日志:
这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。
# 总结
消息超时的两种方式是?
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒
2.3.延迟交换机插件
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了DelayExchange插件,原生支持延迟队列效果。
参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
# 使用DelayExchange
插件的使用也非常简单:
- 声明一个交换机,交换机的类型可以是任意类型
- 设定delayed属性为true
- 声明队列与其绑定
# 1)声明DelayExchange交换机
基于注解方式【常用】:
注意:如果MQ容器没有安装DelayExchange插件,直接指定delayed=true,启动项目时会报错
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key="delay"
))
public void listenDelayedQueue(String msg){
log.info("接受到 delay.queue的延迟消息: {}",msg);
}
在consumer服务的SpringRabbitListener中添加:
> 优势:代码简单
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消费者接收到了delay.queue的延迟消息{}", msg);
}
第二种方式:也可以基于@Bean的方式:
> 优势:清晰明了
# 2)发送消息
发送消息时,一定要携带x-delay属性,指定延迟的时间:
@Test
public void testDelayedMsg(){
Message message = MessageBuilder
.withBody("hello,delayed message",getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",10000)
.build();
CorrelationData correlationData = new CorrelationData(UUID.random.UUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
log.debug("发送消息成功");
}
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, delayed messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 10000) //时间必须是数字,不能是字符串
.build();
// 2.准备CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("发送消息成功");
}
此时,idea控制台会有一个报错信息:
原因很简单,在之前课程中我们添加了定义发送者Return回调,如果消息发送之后没有到达队列就会报错。
当使用插件发送消息时设置了x-delay=10000,那消息只要没有到过期时间,就不会路由到队列中,
而是存在一个叫Mnesia的分布式数据库管理系统中。
因此需要在publisher服务的CommonConfig中,判断是否为延迟消息:
// 判断是否是延迟消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
# 总结
延迟队列插件的使用步骤包括哪些?
- 声明一个交换机,添加delayed属性为true
- 发送消息时,添加x-delay头,值为超时时间(必须是int值)
3.惰性队列
# 消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,
直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有三种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池多线程处理,加快消息处理速度
- 惰性队列:扩大队列容积,提高堆积上限
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存,缺点:速度会变慢
- 消费者要消费消息时才会从磁盘中读取并加载到内存,最终推送给消费者
- 支持数百万条的消息存储
3.1.基于命令行设置lazy-queue
注:(本操作是Linux操作系统进行的)
设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
可以通过命令行将一个运行中的队列修改为惰性队列:
#进入MQ容器
docker exec -it mq bash
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
- `rabbitmqctl` :RabbitMQ的命令行工具
- `set_policy` :添加一个策略
- `Lazy` :策略名称,可以自定义
- `"^lazy-queue$"` :用正则表达式匹配队列的名字
- `'{"queue-mode":"lazy"}'` :设置队列模式为lazy模式
- `--apply-to queues `:策略的作用对象,是所有的队列
3.2.@Bean声明lazy-queue
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy()//开启x-queue-mode为lazy
.build();
}
package cn.itcast.mq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LazyConfig {
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() //指定是惰性队列
.build();
}
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue").build();
}
}
重启cousumer服务:确认已经创建了以上两个队列:
3.3.注解声明LazyQueue
此处没有给队列绑定交换机,因此使用的是queuesToDeclare = ?,而不是bindings = ?
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listLazyQueue(String msg) {
log.info("接收到 lazy.queue 的消息:{}", msg);
}
3.4.测试
先把cousumer服务停掉,不然发送的消息都被消费掉了,无法观察效果
1、在publisher服务的SpringAmqpTest中发送消息到惰性队列:
@Test
public void testLazyQueue() throws InterruptedException {
long b = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("lazy.queue", message);
}
long e = System.currentTimeMillis();
System.out.println(e - b);
}
发现消息都在磁盘中:
2、也可以发送到普通队列,做为对比:
@Test
public void testNormalQueue() throws InterruptedException {
long b = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, Spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 2.发送消息
rabbitTemplate.convertAndSend("normal.queue", message);
}
long e = System.currentTimeMillis();
System.out.println(e - b);
}
发现消息都在内存中:
# 总结
消息堆积问题的解决方案?
- 队列上绑定多个消费者,提高消费速度
- 在消费者内开启线程池多线程处理,加快消息处理速度
- 使用惰性队列,可以再mq中保存更多消息
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
4.MQ集群
4.1.集群分类
RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。
RabbitMQ的集群有两种模式:
- 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
- 镜像集群:是一种主从集群,在普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。
因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保
主从的数据一致性。
4.2.普通集群
# 集群结构和特征
普通集群,或者叫标准集群(classic cluster),具备下列特征:
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
结构如图:
4.3.镜像集群
# 集群结构和特征
镜像集群:本质是主从模式,具备下面的特征:
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的**主节点,**备份到的其它节点叫做该队列的**镜像**节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主
结构如图:
4.4.仲裁队列
# 集群特征
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。
仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,底层采用Raft协议确保主从的数据一致性,
具备下列特征:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
Java代码创建仲裁队列
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue2") // 持久化
//.layzy() //惰性队列
.quorum() // 仲裁队列
.build();
}
# SpringAMQP连接MQ集群
注意,这里用address来代替host、port方式
spring:
rabbitmq:
#host: 192.168.200.130
#port: 5672
addresses: 192.168.200.130:8071, 192.168.200.130:8072, 192.168.200.130:8073
username: itcast
password: 123321
virtual-host: /
注意:因为重新创建的3个MQ集群,还没有安装延迟队列插件,因此原来练习延迟队列的代码需要注释掉:
1、创建交换机时
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消费者接收到了delay.queue的延迟消息{}", msg);
}
/******************************************************************************************/
2、发送消息时
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.准备消息
Message message = MessageBuilder
.withBody("hello, delayed messsage".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", 10000)
.build();
// 2.准备CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("发送消息成功");
}