当前位置: 首页 > article >正文

解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

  • 问题发现
  • 问题解决
    • 方法一:只监听死信队列,在死信队列里面处理业务逻辑
    • 方法二:修改预取值

问题发现

最近再学习RabbitMQ过程中,看到关于死信队列内容:

来自队列的消息可以是 “死信”,这意味着当以下四个事件中的任何一个发生时,这些消息将被重新发布到 Exchange

  1. 使用 basic.rejectbasic.nackrequeue 参数设置为 false 的使用者否定该消息
  2. 消息由于每条消息的 TTL 而过期
  3. 队列超出了长度限制

之前的文章里面有讲解过TTL过期后不进入死信队列的疑惑和解决办法,然后上手去实践另一个死信队列的方法,结果又是一道坑等着我,示例代码如下:

默认自动应答模式

@Configuration
public class MQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue");
    }
    /**
     * 死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.exchange");
    }

    /**
     * 死信队列和死信交换机绑定
     * @return
     */
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue(){
//          方法一
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
//        normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
//        方法二
        return  QueueBuilder.durable("normal_queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead")
                .maxLength(5) // 设置队列最大长度为5
                .build();
    }
    /**
     * 普通交换机
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.exchange");
    }

    /**
     * 普通队列和普通交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
    }
}

监听普通队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @RabbitHandler
    public void receive(String msg) {
        log.info("收到消息:"+msg);
    }
}

监听死信队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg) {
        log.info("死信队列收到消息:{}",msg);
    }
}

发送方发送10条消息,示例代码如下:

@Component
public class MQSender {
    @Autowired
    private RabbitTemplate template;

    public void send() {
        for (int i = 0; i < 10; i++) {
            String msg = "hello world_"+i;
            template.convertAndSend("normal.exchange", "normal", msg);
        }
    }
}

然后调用send()方法,执行结果如图:

在这里插入图片描述

按道理会将队列前面的5条消息进入死信队列,然后剩下的五条消息正常消费才对,我们检查一下队列是否设置成功,如图所示

在这里插入图片描述
设置没有问题,那就很懵逼了。。。

问题解决

我们先在页面上向普通队交换机发送10条消息,然后查看它的状态,如图所示:
在这里插入图片描述
超过5条消息就会放入死信队列中,如图所示:

.
然后再看一下,默认行为是从队列前面删除或死信消息,如图所示:

在这里插入图片描述
我们可以看到普通队列存放的是最后5条消息,前面的5条消息进入死信队列。也就是说,再没有进入普通消费者之前会将队列前面删除或死信消息(进入消费者之前将消息进行分配)。

方法一:只监听死信队列,在死信队列里面处理业务逻辑

这种做法也是网上大多数文章的一种处理方法(另外一种情况就是进入普通消费者,还没被消费完的情况下,消费者挂了,然后队列就会重新分配,将从队列前面删除或者进入死信队列),如图所示:

在这里插入图片描述

但是这些做法都是基于没有普通消费者监听的情况下进行的,感觉和我理解的略有偏差(应该是再有普通消费者监听和死信队列监听的情况下,发送消息时会对消息进行分配处理)。

发送方代码和配置的代码就不重复展示了(参考之前示例),监听死信队列(自动确认模式和手动确认模式都一样),示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg,Message message,Channel channel) throws IOException {
        log.info("死信队列收到消息:{}",msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

调用send()方法,执行结果如图:

在这里插入图片描述
然后再死信队列里面处理业务逻辑即可。

方法二:修改预取值

再监听普通队列时抛异常或者手动拒绝重新进入队列,这两种方式都不能达到我想要的效果,我发现只要有普通消费方监听就会进入消费,然后我就在想是不是因为预期值的问题导致,可能我消息数量太少了,然后默认预期值太高导致消息直接进入消费,然后将预取值改为1自动确认模式,示例代码如下:

spring.rabbitmq.listener.simple.prefetch=1

发送方代码和配置的代码就不重复展示了(参考之前示例),监听普通队列,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @RabbitHandler
    public void receive(String msg) throws InterruptedException {
        log.info("收到消息:"+msg);
        // 模拟业务处理队列等待场景
        Thread.sleep(10000);
    }
}

监听死信队列,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg) {
        log.info("死信队列收到消息:{}",msg);

    }
}

调用send()方法,执行结果如图:

在这里插入图片描述
当然有的时候也不一定完全按照你设置的最大长度进入死信队列,有的时候消费速度太快(队列的第一个已经被消费了的情况),得看实际情况,至少可以确保再设置了大于队列最大长度时是可以正常进入死信队列的。归根结底:消息数量太少了

另外我们再来介绍一下溢出方式,一般默认情况下溢出方式为:drop-head(从队列前面删除或者进入死信队列),除此之外还有两种:

  • reject-publishreject-publish-dlx:最近发布的消息将被丢弃。reject-publishreject-publish-dlx 之间的区别在于 reject-publish-dlx 也是死信拒绝消息。

将配置的溢出模式改为reject-publish,示例代码如下:

@Configuration
public class MQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue");
    }
    /**
     * 死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.exchange");
    }

    /**
     * 死信队列和死信交换机绑定
     * @return
     */
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue() {
//          方法一
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
//        normalQueue.addArgument("x-overflow","reject-publish");//最近发布的消息将被丢弃
//        方法二
        return  QueueBuilder.durable("normal_queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead")
                .maxLength(5) // 设置队列最大长度为5
                .overflow(QueueBuilder.Overflow.dropHead)
                .build();
    }
    /**
     * 普通交换机
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.exchange");
    }

    /**
     * 普通队列和普通交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
    }
}

重新创建队列,调用send()方法,如图所示:

在这里插入图片描述
由图可以,死信队列并不会受到消息。

然后我们再来看看将溢出模式设置为reject-publish-dlxQueueBuilder.Overflow没有该参数,手动定义),示例代码如下:

@Configuration
public class MQConfig {
    //忽略死信队列创建和绑定过程,参考前面示例...

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue() {
        Queue normalQueue = new Queue("normal_queue");
        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
        normalQueue.addArgument("x-max-length", 5);//设置队列最大长度
        normalQueue.addArgument("x-overflow","reject-publish-dlx"); //最近发布的消息进入死信队列
        return normalQueue;
    }
    //忽略普通队列创建过程,参考前面示例...
}

重新创建队列,调用send()方法,如图所示:

在这里插入图片描述

如果你有更好的方法或者不同的理解,欢迎评论区交流。


http://www.kler.cn/a/315814.html

相关文章:

  • PHP 循环控制结构深度剖析:从基础到实战应用
  • 工业 4G 路由器赋能远程医疗,守护生命线
  • JS爬虫实战演练
  • spring boot发送邮箱,java实现邮箱发送(邮件带附件)3中方式【保姆级教程一,代码直接用】
  • 周邦彦,北宋文坛的独特乐章
  • 前端工程化4:从0到1构建完整的前端监控平台
  • 自动化生成与更新 Changelog 文件
  • 花生壳、神卓互联等主流内网穿透技术分享
  • FTP服务
  • 编译 Android 11源码
  • 人工智能(AI)的影响下人类的生活样子
  • Shell 脚本学习
  • STM32 单片机最小系统全解析
  • Vue子组件样式受到父组件污染
  • 【C++11】异常处理
  • 【嵌入式】操作系统相关概念
  • 中序遍历二叉树全过程图解
  • 关于ClickHouse建表 集群表 SQL
  • GitHub 上高星 AI 开源项目推荐
  • QT For Android开发-打开PPT文件
  • 如何备份SqlServer数据库
  • Lua中..和...的使用区别
  • Oracle 启动动态采样 自适应执行计划
  • 计算机毕业设计Python深度学习房价预测 房价可视化 链家爬虫 房源爬虫 房源可视化 卷积神经网络 大数据毕业设计 机器学习 人工智能 AI