当前位置: 首页 > article >正文

死信交换机,延迟队列和惰性队列

目录

死信交换机

什么是死信交换机

死信交换机和自定义的失败策略指定的错误消息交换机不一样

 创建死信交换机接收死信

总结 

TTL 

​编辑 设置延时接收消息

1.设置有超时时间的消息队列,并绑定死信交换机

2.发送消息到设置超时时间的消息队列中

3.消费者监听死信消息队列

4.启动

5.也可以在发送消息时设置过期时间,与消息队列设置的超时时间相比,哪一个时间短就是哪个

总结

延迟队列

 安装插件

第一步:找到mq挂载数据卷的目录

第二步:把插件放到这个目录里面

第三步:安装插件 

第四步:验证

DelayExchange原理

 使用DelayExchange

1)编写使用配置类

2)设置消费者监听 

 3)发送消息,给消息的x-delay请求头设置延时时间

 4)结果

总结

惰性队列 

基于@Bean声明lazy-queue 

基于@RabbitListener声明LazyQueue 

总结


死信交换机

什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto #自动发送消息给mq消息是否正常消费
        retry: #消息失败后本地的重试策略
          enabled: true
          initial-interval: 1000
          multiplier: 3 
          max-attempts: 4 #重试次数

被拒绝的原因:监听消息队列的方法执行指定的次数(4次)之后还是出现异常这个消息就会被拒绝(默认的消息失败策略)

 因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:

 如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

 

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

 

死信交换机和自定义的失败策略指定的错误消息交换机不一样

死信交换机会存放被消费者拒绝的消息,过期的消息,还有消息队列满无法存放的消息

而自定义的失败策略指定的错误消息交换机只会存放被消费者拒绝的消息

    //设置消息失败处理策略
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

 创建死信交换机接收死信

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

 编写代码:

@Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("mqAd.topic",true,false);
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("mqAd.simple.queue")//设置持久化并取名
                .deadLetterExchange("dl.direct")//设置绑定的死信交换机的名字
                .deadLetterRoutingKey("dl")//设置死信交换机的路由
                .build();
    }
    //声明死信交换机
    @Bean
    public DirectExchange dlExchange(){
        return new DirectExchange("dl.direct");
    }
    //声明存放死信的消息队列
    @Bean
    public Queue dlQueue(){
        return new Queue("dl.queue");
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
    }
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(simpleQueue()).to(topicExchange()).with("simple");
    }

注意:需要把我们自定义的消息失败策略的bean给注释掉,不然会把被消费者拒绝的消息回到我们自定义的错误消息队列中,而不是到死信交换机中

 //设置消息失败处理策略
    /*@Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }*/

 启动项目,发现被消费者拒绝的消息成功发送到与死信交换机绑定的死信消息队列中,而不是直接删除此消息

总结 

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack
  • 消息超时未消费
  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

TTL 

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

 设置延时接收消息

1.设置有超时时间的消息队列,但是不设置监听方法来监听这个消息队列

2.将这个消息队列绑定到一个死信交换机,因为超时时间的消息队列没有消费者消费,就会过期,然后过期的消息就会到死信交换机中

3.设置消费者监听死信交换机,这样一来就实现了延时接收消息的功能

1.设置有超时时间的消息队列,并绑定死信交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("mqAd.topic",true,false);
    }
    //设置有过时时间的消息队列
    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                .ttl(10000)//设置过期时间为10s
                .deadLetterExchange("dl.direct")//设置绑定的死信交换机的名字
                .deadLetterRoutingKey("dl")//设置死信交换机的路由
                .build();
    }
    //声明死信交换机
    @Bean
    public DirectExchange dlExchange(){
        return new DirectExchange("dl.direct");
    }
    //声明存放死信的消息队列
    @Bean
    public Queue dlQueue(){
        return new Queue("dl.queue");
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");
    }
    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(ttlQueue()).to(topicExchange()).with("ttl");
    }
2.发送消息到设置超时时间的消息队列中
    @Test
    public void test02() {
        Message message = MessageBuilder.withBody("hhh".getBytes(StandardCharsets.UTF_8)).build();
        rabbitTemplate.convertAndSend("mqAd.topic", "ttl", message);
        log.info("发送消息的时间为:{}", LocalDateTime.now());
    }
3.消费者监听死信消息队列
  @RabbitListener(queues = "dl.queue")
    public void listen(String msg){
        log.info("接收到的消息为:{},时间:{}",msg,LocalDateTime.now());
    }
4.启动

可以发现相差了10s

 

5.也可以在发送消息时设置过期时间,与消息队列设置的超时时间相比,哪一个时间短就是哪个
    @Test
    public void test02() {
        Message message = MessageBuilder.withBody("hhh".getBytes(StandardCharsets.UTF_8))
                .setExpiration("5000")//设置这个消息的过期时间为5秒
                .build();
        rabbitTemplate.convertAndSend("mqAd.topic", "ttl", message);
        log.info("发送消息的时间为:{}", LocalDateTime.now());

可以发现差了5s 

 

总结

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机
  • 将消费者监听的队列绑定到死信交换机
  • 发送消息时给消息设置超时时间为20秒

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多且使用TTL配置死信交换机需要配置的内容太多(死信交换机,才有超时时间的消息队列),所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

 这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:Community Plugins | RabbitMQ

 安装插件

第一步:找到mq挂载数据卷的目录

 

第二步:把插件放到这个目录里面

 

第三步:安装插件 

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令: 

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

第四步:验证

交换机出现x-delayed延迟类型

 

DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息的请求头是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,就持久化到硬盘,先不放到消息队列中,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,才投递消息到指定队列

 使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。 

1)编写使用配置类
@Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder.directExchange("delay.direct")
                .durable(true)//设置持久化
                .delayed()//设置为延迟交换机
                .build();
    }
    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable("delay.queue")
                .build();
    }
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange()).with("delay");
    }
2)设置消费者监听 
  @RabbitListener(queues = "delay.queue")
    public void listenDelayMsg(String msg){
        log.info("接收到的消息为:{},时间:{}",msg,LocalDateTime.now());
    }
 3)发送消息,给消息的x-delay请求头设置延时时间
    @Test
    public void test03() {
        Message message = MessageBuilder.withBody("hhh".getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay",10000)//给这个消息设置延时时间,10s
                .build();
        rabbitTemplate.convertAndSend("mqAd.topic", "ttl", message);
        log.info("发送消息的时间为:{}", LocalDateTime.now());
    }
 4)结果

会触发消息回执方法,没有路由到消息队列,因为这是一个延时消息,延时交换机会把这个消息先写到磁盘上,而不是直接写到消息队列,所以会触发这个方法,因此可以忽略不记。

 

使用注解设置延时交换机

总结

延迟队列插件的使用步骤包括哪些?

•声明一个交换机,添加delayed属性为true

•发送消息时,添加x-delay头,值为超时时间

惰性队列 

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

基于@Bean声明lazy-queue 

 

基于@RabbitListener声明LazyQueue 

 

普通的消息队列会先把接收到的消息放到内存中,然后每间隔一段时间,都会触发page-out把消息存到磁盘中,如果消息过多,同时操作内存和磁盘开销很大。

而惰性队列只会把接收到的消息存到磁盘中,没有间歇性的page-out。 

总结

消息堆积问题的解决方案?

  • 队列上绑定多个消费者,提高消费速度
  • 使用惰性队列,可以在mq中保存更多消息

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

http://www.kler.cn/a/391647.html

相关文章:

  • linux自动分区后devmappercentos-home删除后合并到其它分区上
  • 【MySQL】SQL菜鸟教程(一)
  • 用python编写一个放烟花的小程序
  • TPS61022 PFM的机制以及TPS61xxx转换器的PFM与PWM之间的负载阈值
  • 计算机组成原理(1)
  • LKT4304新一代算法移植加密芯片,守护物联网设备和云服务安全
  • 电脑监控如何多画面显示?3大方法带你玩转多屏一画,实现管理效率翻倍涨!
  • Mac 安装protobuf2.5.0
  • C++(Qt)软件调试---静态分析工具cppcheck(22)
  • LLMs之Code:Github Spark的简介、安装和使用方法、案例应用之详细攻略
  • C# DataTable使用Linq查询详解
  • 2024最新版JavaScript逆向爬虫教程-------基础篇之Proxy与Reflect详解
  • 知识见闻 - 苹果手机拨号键长按
  • C# 字典应用
  • CTF-RE 从0到N: windows反调试-获取Process Environment Block(PEB)信息来检测调试
  • 时间序列数据结构、持久数据结构详细解读
  • DHCP与FTP
  • 【设计模式】行为型模式(一):模板方法模式、观察者模式
  • 番外篇 | 关于YOLO11算法的改进点总结
  • 计算机毕业设计Python+大模型动漫推荐系统 动漫视频推荐系统 机器学习 协同过滤推荐算法 bilibili动漫爬虫 数据可视化 数据分析 大数据毕业设计
  • Pytorch实现运动鞋识别
  • KPI绩效系统源码,java版医院绩效核算系统源码,科室绩效考核、奖金核算、审核、发放一体化
  • GIS前后端分离项目展示
  • 电商系统开发:Spring Boot框架的实践
  • Day09 C++ 存储类
  • ubuntu2204部署RAGFlow(非docker)