死信交换机,延迟队列和惰性队列
目录
死信交换机
什么是死信交换机
死信交换机和自定义的失败策略指定的错误消息交换机不一样
创建死信交换机接收死信
总结
TTL
编辑 设置延时接收消息
1.设置有超时时间的消息队列,并绑定死信交换机
2.发送消息到设置超时时间的消息队列中
3.消费者监听死信消息队列
4.启动
5.也可以在发送消息时设置过期时间,与消息队列设置的超时时间相比,哪一个时间短就是哪个
总结
延迟队列
安装插件
第一步:找到mq挂载数据卷的目录
第二步:把插件放到这个目录里面
第三步:安装插件
第四步:验证
DelayExchange原理
使用DelayExchange
1)编写使用配置类
2)设置消费者监听
3)发送消息,给消息的x-delay请求头设置延时时间
4)结果
总结
惰性队列
基于@Bean声明lazy-queue
基于@RabbitListener声明LazyQueue
总结
死信交换机
什么是死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
如图,一个消息被消费者拒绝了,变成了死信:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto #自动发送消息给mq消息是否正常消费
retry: #消息失败后本地的重试策略
enabled: true
initial-interval: 1000
multiplier: 3
max-attempts: 4 #重试次数
被拒绝的原因:监听消息队列的方法执行指定的次数(4次)之后还是出现异常这个消息就会被拒绝(默认的消息失败策略)
因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
另外,队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
死信交换机和自定义的失败策略指定的错误消息交换机不一样
死信交换机会存放被消费者拒绝的消息,过期的消息,还有消息队列满无法存放的消息
而自定义的失败策略指定的错误消息交换机只会存放被消费者拒绝的消息
//设置消息失败处理策略
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
创建死信交换机接收死信
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
编写代码:
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("mqAd.topic",true,false);
}
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("mqAd.simple.queue")//设置持久化并取名
.deadLetterExchange("dl.direct")//设置绑定的死信交换机的名字
.deadLetterRoutingKey("dl")//设置死信交换机的路由
.build();
}
//声明死信交换机
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct");
}
//声明存放死信的消息队列
@Bean
public Queue dlQueue(){
return new Queue("dl.queue");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(simpleQueue()).to(topicExchange()).with("simple");
}
注意:需要把我们自定义的消息失败策略的bean给注释掉,不然会把被消费者拒绝的消息回到我们自定义的错误消息队列中,而不是到死信交换机中
//设置消息失败处理策略
/*@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}*/
启动项目,发现被消费者拒绝的消息成功发送到与死信交换机绑定的死信消息队列中,而不是直接删除此消息
总结
什么样的消息会成为死信?
- 消息被消费者reject或者返回nack
- 消息超时未消费
- 队列满了
死信交换机的使用场景是什么?
- 如果队列绑定了死信交换机,死信会投递到死信交换机;
- 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
TTL
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间
设置延时接收消息
1.设置有超时时间的消息队列,但是不设置监听方法来监听这个消息队列
2.将这个消息队列绑定到一个死信交换机,因为超时时间的消息队列没有消费者消费,就会过期,然后过期的消息就会到死信交换机中
3.设置消费者监听死信交换机,这样一来就实现了延时接收消息的功能
1.设置有超时时间的消息队列,并绑定死信交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("mqAd.topic",true,false);
}
//设置有过时时间的消息队列
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000)//设置过期时间为10s
.deadLetterExchange("dl.direct")//设置绑定的死信交换机的名字
.deadLetterRoutingKey("dl")//设置死信交换机的路由
.build();
}
//声明死信交换机
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct");
}
//声明存放死信的消息队列
@Bean
public Queue dlQueue(){
return new Queue("dl.queue");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(ttlQueue()).to(topicExchange()).with("ttl");
}
2.发送消息到设置超时时间的消息队列中
@Test
public void test02() {
Message message = MessageBuilder.withBody("hhh".getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.convertAndSend("mqAd.topic", "ttl", message);
log.info("发送消息的时间为:{}", LocalDateTime.now());
}
3.消费者监听死信消息队列
@RabbitListener(queues = "dl.queue")
public void listen(String msg){
log.info("接收到的消息为:{},时间:{}",msg,LocalDateTime.now());
}
4.启动
可以发现相差了10s
5.也可以在发送消息时设置过期时间,与消息队列设置的超时时间相比,哪一个时间短就是哪个
@Test
public void test02() {
Message message = MessageBuilder.withBody("hhh".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")//设置这个消息的过期时间为5秒
.build();
rabbitTemplate.convertAndSend("mqAd.topic", "ttl", message);
log.info("发送消息的时间为:{}", LocalDateTime.now());
可以发现差了5s
总结
消息超时的两种方式是?
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒
延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多且使用TTL配置死信交换机需要配置的内容太多(死信交换机,才有超时时间的消息队列),所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:Community Plugins | RabbitMQ
安装插件
第一步:找到mq挂载数据卷的目录
第二步:把插件放到这个目录里面
第三步:安装插件
最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
执行时,请将其中的 -it
后面的mq
替换为你自己的容器名.
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
第四步:验证
交换机出现x-delayed延迟类型
DelayExchange原理
DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息的请求头是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,就持久化到硬盘,先不放到消息队列中,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,才投递消息到指定队列
使用DelayExchange
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。
1)编写使用配置类
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder.directExchange("delay.direct")
.durable(true)//设置持久化
.delayed()//设置为延迟交换机
.build();
}
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("delay.queue")
.build();
}
@Bean
public Binding binding(){
return BindingBuilder.bind(delayQueue())
.to(delayExchange()).with("delay");
}
2)设置消费者监听
@RabbitListener(queues = "delay.queue")
public void listenDelayMsg(String msg){
log.info("接收到的消息为:{},时间:{}",msg,LocalDateTime.now());
}
3)发送消息,给消息的x-delay请求头设置延时时间
@Test
public void test03() {
Message message = MessageBuilder.withBody("hhh".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",10000)//给这个消息设置延时时间,10s
.build();
rabbitTemplate.convertAndSend("mqAd.topic", "ttl", message);
log.info("发送消息的时间为:{}", LocalDateTime.now());
}
4)结果
会触发消息回执方法,没有路由到消息队列,因为这是一个延时消息,延时交换机会把这个消息先写到磁盘上,而不是直接写到消息队列,所以会触发这个方法,因此可以忽略不记。
使用注解设置延时交换机
总结
延迟队列插件的使用步骤包括哪些?
•声明一个交换机,添加delayed属性为true
•发送消息时,添加x-delay头,值为超时时间
惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
基于@Bean声明lazy-queue
基于@RabbitListener声明LazyQueue
普通的消息队列会先把接收到的消息放到内存中,然后每间隔一段时间,都会触发page-out把消息存到磁盘中,如果消息过多,同时操作内存和磁盘开销很大。
而惰性队列只会把接收到的消息存到磁盘中,没有间歇性的page-out。
总结
消息堆积问题的解决方案?
- 队列上绑定多个消费者,提高消费速度
- 使用惰性队列,可以在mq中保存更多消息
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO