当前位置: 首页 > article >正文

RabbitMQ消费者确认和重复机制

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

        ack:成功处理消息,RabbitMQ从队列中删除该消息

        nack:消息处理失败,RabbitMQ需要再次投递消息

        reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

   none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

   manual手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

         auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回nack

如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring: 
  rabbitmq:
    listener: 
      simple: 
        acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

重复机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力,为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次,本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject。


http://www.kler.cn/a/303158.html

相关文章:

  • 浅谈云计算20 | OpenStack管理模块(下)
  • 仿射密码实验——Python实现(完整解析版)
  • HTML中link的用法
  • 80_Redis内存策略
  • 【Kafka】Linux+KRaft集群部署指南
  • 探索 Transformer²:大语言模型自适应的新突破
  • Python爬虫案例七:抓取南京公交信息数据并将其保存成excel多表形式
  • EXCEL导出功能——相关报错
  • 微信小程序开发注意事项
  • 通过mqtt通信远程控制大疆无人机
  • Java 学习路线:语言、框架、中间件与数据库
  • 【RabbitMQ】RabbitMQ如何保证数据的可靠性,RabbitMQ如何保证数据不丢失,数据存储
  • 手机玩机常识-------诺基亚系列机型3/5/6/7/8详细的刷机教程步骤 手机参考救砖刷机教程
  • Linux+Docker:3分钟实现MinIO在线部署与Java集成
  • 性能测试的复习3-jmeter的断言、参数化、提取器
  • 240909-ChuanhuChatGPT集成Ollama的环境配置
  • 卷积神经网络经典模型架构简介
  • 中国电子学会202406青少年软件编程(Python)等级考试试卷(三级)真题与解析
  • Linux(5)--CentOS8使用yum
  • 【Vue】- Vue表达式
  • 【漏洞复现】科荣AIO moffice Sql注入漏洞
  • 【HarmonyOS】Beta最新对外版本IDE下载和环境配置
  • 如何在Windows10系统安装docker?
  • Springboot引入通义千文大模型API
  • 前端代码规范- Commit 提交规范
  • linux-L10.查看你硬盘容量