当前位置: 首页 > article >正文

RabbitMQ 高级特性——重试机制

在这里插入图片描述

文章目录

  • 前言
  • 重试机制
    • 配置文件设置
    • 生命交换机、队列和绑定关系
    • 生产者发送消息
    • 消费消息

前言

前面我们学习了 RabbitMQ 保证消息传递可靠性的机制——消息确认、持久化和发送发确认,那么对于消息确认和发送方确认,如果接收方没有收到消息,那么这时该怎么做呢?这就要提到 RabbitMQ 的重试机制了,当发送方发送的消息没有成功到达接收方,那么发送方就会使用重试机制来向接收方重新发送消息,如果接收方因为程序逻辑错误引起的,那么不管发送方重新发送多少次,接收方都不能正确的处理这个消息,那么发送方也不能无休止的向接收方发送消息,当发送的次数超过某个数量的时候,那么这个消息就会被标记为死信,然后被处理掉。那么这篇文章将详细的说明一下 RabbitMQ 的重试机制。

重试机制

RabbitMQ的重试机制是一种强大的功能,它允许在消息处理失败时自动重试,从而提高系统的可靠性和稳定性。

重试机制的触发条件

  1. 消息发送失败:当消息发送到RabbitMQ服务器时,如果因为网络问题、认证失败或其他原因导致发送失败,RabbitMQ会尝试重试发送。
  2. 消息消费失败:当消费者从队列中获取消息并尝试处理时,如果因为某种原因(如业务逻辑错误、系统异常等)导致处理失败,RabbitMQ会根据预设的重试策略进行重试。

我们都知道 Spring AMQP 提供了四种确认策略:NONE、MANUAL、AUTO。当确认策略为 NONE 的时候队列不管这个消息是否被成功处理都会将这个消息从队列中删除,而确认策略为 MANUAL 的时候则更多的靠程序本身处理,那么重试机制更多的使用在确认策略为 AUTO 的情况下。

配置文件设置

spring:
  rabbitmq:
    addresses: amqp://admin:admin@x.x.x.x:5672/test
    listener:
      simple:
        acknowledge-mode: auto # 确认模式
        retry:
          enabled: true # 开始重试机制
          initial-interval: 5000ms # 每次重试的间隔时间
          max-attempts: 5 # 最大重试次数

生命交换机、队列和绑定关系

public class Constants {
    public static final String RETRY_EXCHANGE = "retry.exchange";
    public static final String RETRY_QUEUE = "retry.queue";
}
@Configuration
public class RabbitConfig {
    @Bean("retryExchange")
    public DirectExchange retryExchange() {
        return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();
    }

    @Bean("retryQueue")
    public Queue retryQueue() {
        return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
    }

    @Bean("retryBinding")
    public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("retry");
    }
}

生产者发送消息

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RequestMapping("/retry")
    public String retry() {
        rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","rabbitmq retry");
        return "消息发送成功";
    }
}

消费消息

@Component
public class RetryListener {
    @RabbitListener(queues = Constants.RETRY_QUEUE)
    public void listener(Message message, Channel channel) {
        System.out.println("接收到消息:" + message + channel);
        //制造出异常使得消费者不能正常处理消息
        int ret = 3/0;
        System.out.println("消息处理成功");
    }
}

我们先将重试机制的配置给注释掉,然后运行之后会发现消费者会重复读取一条消息并且一直抛异常,这是因为:

由于监听器方法抛出了异常并且没有被捕获,Spring AMQP将不会发送ACK,RabbitMQ将不断重新投递这条消息给消费者,直到它最终被确认、被拒绝(并可能发送到死信队列)或队列被删除。

那么我们配置重试机制的重试次数之后再看看什么效果:

在这里插入图片描述

可以发现,当配置了重试次数之后,包括第一次投递消息,队列一共给消费者发送了五次消息,并且这个五次的消息的 deliveryTag 是一样的,也就说明是队列的重试机制投递的,五次之后队列便不再向该消费者发送该消息,这样就避免了因消费者的程序逻辑问题而导致队列无休止的向消费者发送消息的问题,并且还保证了消息传递的可靠性。

那么我们将消息的确认策略更换为 MANUAL 手动确认的方式,然后看看这个重试机制的配置会不会生效:

@RabbitListener(queues = Constants.RETRY_QUEUE)
public void listener(Message message, Channel channel) throws IOException, InterruptedException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        System.out.println("接收到消息:" + message + channel);
        //制造出异常使得消费者不能正常处理消息
        int ret = 3/0;
        System.out.println("消息处理成功");
        channel.basicAck(deliveryTag,false);
    } catch (Exception e) {
        Thread.sleep(1000);
        //第三个参数requeue表示被拒绝的消息是否重新进入队列,true表示重新进入队列并且重新投递这个消息,否则直接丢弃
        channel.basicNack(deliveryTag,false,true);
    }
}

在这里插入图片描述
发现当确认策略为 manual 手动确认的时候,我们的重试机制的配置是不生效的,并且可以发现我们的消息的 deliveryTag 是不断递增的,也就是说之前拒绝的消息每次都会重新入队列然后重新投递,跟重试机制是不一样的。

而如果我们 basicNack 方法的第三个参数 requeue 参数的值为 fasle 时候,那么这个被拒绝的消息就会被丢弃或者投递到的死信队列中。

使用重试机制时需要注意:

  1. 自动确认模式下:程序逻辑出现异常,即使多次重试还是失败,消息也会被自动确认,那么这条消息就会丢失。

  2. 在手动确认模式下,消费者需要显式地发送ACK(Acknowledgment)或NACK来告诉RabbitMQ消息是否已被成功处理。如果发生异常且没有发送ACK,消息将保持为unacked状态,这允许RabbitMQ重新将消息发送给其他消费者(如果配置了多个消费者)或根据队列的其他配置(如死信队列)来处理未确认的消息。然而,如果所有消费者都无法处理该消息,它将导致消息在队列中积压,除非配置了适当的超时机制或死信队列来处理这些长时间未确认的消息。


http://www.kler.cn/a/323516.html

相关文章:

  • 图像处理技术椒盐噪声
  • 计算机网络 (4)计算机网络体系结构
  • windows tomcat 报错后如何让窗口不闪退
  • Photoshop(PS)——人像磨皮
  • Flink Job更新和恢复
  • Flutter实现绝对定位学习
  • 【前端面试题】Vue 3 生命周期钩子的执行顺序详解
  • 2024年中国电子学会青少年软件编程(Python)等级考试(二级)核心考点速查卡
  • Supervisor进程管理工具安装
  • Python爬虫获取指定内容
  • django drf 统一处理操作人和时间字段
  • leetcode-238. 除自身以外数组的乘积-前n项的思想
  • 一键降重:芝士AI如何简化论文查重过程?
  • 05-成神之路_ambari_Ambari实战-013-代码生命周期-metainfo-configFiles详解
  • 【第十六章:Sentosa_DSML社区版-机器学习之生存分析】
  • sql server每天定时执行sql语句
  • 【Python快速学习笔记01】下载解释器/环境变量配置/PyCharm下载/第一个代码
  • 浅谈软件安全开发的重要性及安全开发实践
  • NSSCTF [SWPUCTF 2021 新生赛]非常简单的逻辑题
  • CodeFormer模型构建指南
  • 网络安全TARA分析
  • [Linux]磁盘分区指令
  • 带你0到1之QT编程:二十、QT与MySQL喜结连理,构建数据库应用开发
  • 大数据电商数仓项目--实战(一)数据准备
  • WebGIS开发及市面上各种二三维GIS开发框架对比分析
  • libreoffice word转pdf