当前位置: 首页 > article >正文

【RabbitMQ】重试机制、TTL

重试机制

在消息从Broker到消费者的传递过程中,可能会遇到各种问题,如网络故障、服务不可用、资源不足等,这些问题都可能导致消息处理失败。为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败之后重新发送。

但如果是程序逻辑引起的错误,那么多次重试也是不起作用的,因此设置了重试次数。

消费者确认机制为AUTO时

当消费者确认机制是AUTO时,如果程序逻辑错误,那么就会不断重试,造成消息积压。因此我们就需要设置重试次数,当多次重试还是失败,消息就会被自动确认,自然消息就会丢失。

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 初始失败等待时长
          max-attempts: 5 # 最大重试次数
@Configuration
public class RetryConfig {

    @Bean("retryQueue")
    public Queue retryQueue() {
        return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
    }

    @Bean("retryExchange")
    public Exchange retryExchange() {
        return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();
    }

    @Bean("retryQueueBind")
    public Binding retryQueueBind(@Qualifier("retryExchange") Exchange exchange,
                                  @Qualifier("retryQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
    }

}
@RestController
@RequestMapping("/retry")
public class RetryController {

    @Resource
    public RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void retryQueue() {
        this.rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "hello 重试机制");
        System.out.println("重试机制生产者发送消息成功");
    }

}
@Configuration
public class RetryListener {

    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void retryListener(String msg) {
        System.out.println("获取到消息:" + msg);
        int n = 3 / 0;
    }

}

上述代码和可靠性传输一文的消费者确认机制中策略为AUTO的代码类似,只不过在此配置文件中加了一个重试机制。当启动程序之后,可以看到如下结果:

5a184519433e4cdbb4e9104af3dfc725.png

重试时:

d489014923f34f40b0df4b4c9c1bea83.png 

重试结束之后: 

06595feb6ba149afa9b9477d21e456cb.png 

 从测试结果可以看出,当消费者确认机制的策略为AUTO时,遇到异常就会进行重试,当重试结束之后依然没有接收时,就会自动确认消息。

消费者确认机制为MANAUL时

当消费者确认机制是MANUL时,修改消费者代码,并启动程序,查看结果:

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        acknowledge-mode: manual # 消息确认机制,手动确认
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 初始失败等待时长
          max-attempts: 5 # 最大重试次数
@Configuration
public class RetryListener {

    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void retryListener(Message msg, Channel channel) throws IOException {
        try {
            System.out.println("接收到消息:" + msg);
            int num = 3 / 0; // 模拟处理失败
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
        }
    }

}

a12c8ed185c8460899ded1f1709ee811.png

从测试结果可以得出,消费者确认机制为手动确认时,并不会依据配置文件中的重试次数等内容来做,而是依据消息者自身的代码实现来做实现机制。原因是因为手动确认模式下,消费者需要显示地对消息进行确认,如果消费者在消息处理过程中遇到异常,可以选择确认不确定消息,也可以选择重新入队。所以重试的控制权并不在应用程序本身,而在于代码逻辑本身。 

1. 消费者确认机制为AUTO时,如果程序逻辑异常,多次重试还是失败。那么消息就会自动确认,进而消息就会丢失。

2. 消费者确认机制为MANAUL时,如果程序逻辑异常,多次重试依然处理失败,无法被确认,消息就会积压。

3. 消费者确认机制为NONE时,不管发生什么情况,当消息从Broker内部发出时,就会自动确认,因此它不存在任何内容。

TTL

TTL,过期时间。当消息到达过期时间之后,还没有被消息,消息就会被自动清除。

RabbitMQ可以对队列和消息设置过期时间。如果两种方法同时使用,那么就以两者较小的值为准。

设置消息的TTL

@Configuration
public class TllConfig {

    @Bean("ttlQueue")
    public Queue ttlQueue() {
        return QueueBuilder.durable(Constants.TTL_QUEUE).build();
    }

    @Bean("ttlExchange")
    public Exchange ttlExchange() {
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
    }

    @Bean("ttlQueueBind")
    public Binding ttlQueueBind(@Qualifier("ttlExchange") Exchange exchange,
                                @Qualifier("ttlQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }

}
@RestController
@RequestMapping("/ttl")
public class TtlController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void ttlQueue() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("50000");
                return message;
            }
        };
        this.rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "hello ttl", messagePostProcessor);
    }

}

 在TTL的测试中,不需要消费者的存在,否则看不到消息在队列中的自动丢失。

设置队列的TTL

注意,设置队列的TTL,并不是过期之后删除整个队列,也是关于消息设置的。只不过投递到该消息队列的所有消息都有一个共同的过期时间而已。

@Configuration
public class TllConfig {

    @Bean("ttlQueue")
    public Queue ttlQueue() {
        return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000).build();
    }

    @Bean("ttlExchange")
    public Exchange ttlExchange() {
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();
    }

    @Bean("ttlQueueBind")
    public Binding ttlQueueBind(@Qualifier("ttlExchange") Exchange exchange,
                                @Qualifier("ttlQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }

}

设置队列的TTL,只需要在声明队列时给出过期时间即可。在测试的过程中,如果是先测试了消息的过期时间,那么在测试队列时,需要先将持久化的队列给删除,再启动程序。

当启动程序之后,可以看到队列上加了一个TTL的标识,表明队列的过期时间设置成功:

16dfa1734e5e43c88a5b678d594a20b1.png 

区别

设置队列的过期时间,一旦消息过期,就会从队列中删除。

设置消息的过期时间,即使消息过期,如果消息不在队首,还得等到消息到达队首之后才会进行判定是否过期。如果过期,那就删除,反之就投递到相应的消费者中。

为什么这两种方法处理的方式不一样?

因为设置队列的过期时间,那么队列中过期的消息一定在队首,RabbitMQ只需要定期从队首扫描消息是否有过期的消息即可。

而设置消息的过期时间,每条消息的过期时间都不一致,如果要删除队列的所有过期消息那么就要扫描整个队列,所以不如等到消息要进行投递时再判断消息是否过期,这样可以减少一定的资源消耗。

 


http://www.kler.cn/news/311397.html

相关文章:

  • hku-mars雷达相机时间同步方案-软件驱动(MID360与海康MV-CB060-10UMUC-S)
  • 2-99 基于matlab多尺度形态学提取眼前节组织
  • 3 种自然语言处理(NLP)技术:RNN、Transformers、BERT
  • 0.5.4 知识库管理微调
  • Linux云计算 |【第四阶段】NOSQL-DAY1
  • C#和数据库高级:抽象类和抽象方法
  • kafka 一步步探究消费者组与分区分配策略
  • Reactor介绍,如何从简易版本的epoll修改成Reactor模型(demo版本代码+详细介绍)
  • YOLOv5/v8 + 双目相机测距
  • 学习大数据DAY58 增量抽取数据表
  • JavaWeb项目打包、部署至Tomcat并启动的全程指南(图文详解)
  • saltstack远程执行
  • 基于SpringBoot+Vue+MySQL的热门网络游戏推荐系统
  • 【网站架构部署与优化】web服务与http协议
  • 十大排序算法的特点及应用场景
  • 英飞凌最新AURIX™TC4x芯片介绍
  • kafka原理剖析及实战演练
  • MySQL-binlog、redolog和undolog的区别
  • android BLE 蓝牙的连接(二)
  • AI生成内容:优点与缺点
  • Docker实操:安装MySQL5.7详解(保姆级教程)
  • 【软考】数据字典(DD)
  • 游戏、网关等服务借助Docker容器化并使用Kubernetes部署、更新等
  • MySQL 中的 EXPLAIN 命令:洞察查询性能的利器
  • MySQL 中的索引覆盖扫描:加速查询的秘密武器
  • 【Linux】Ubuntu 22.04 shell实现MySQL5.7 tar 一键安装
  • 独立站技能树之建站33项自检清单 1.0丨出海笔记
  • STM32 HAL freertos零基础(十一)中断管理
  • Linux技术04-IPVS
  • 游戏如何对抗定制挂