当前位置: 首页 > article >正文

RabbitMQ—保障消费者的可靠性和机制与策略

在企业级应用中,确保消息队列(MQ)消费者的可靠性和效率是至关重要的。这不仅影响到数据的一致性和准确性,也直接关系到用户体验和业务流程的顺畅运行。保障消费者的可靠性不仅仅是技术层面的问题,更是关乎企业能否实现高效、稳定运营的关键因素。通过实施有效的机制和策略,不仅可以提升系统的整体性能,还能增强客户满意度,促进业务的持续发展。

目录

消费者确认机制

失败重试机制

失败处理策略

业务幂等性


问题:RabbitMQ如何得知消费者的处理状态

解决:消费者处理消息时的可靠性解决方案

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制。

。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

ack:成功处理消息,RabbitMQ从队列中删除该消息

nack:消息处理失败,RabbitMQ需要再次投递消息

reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

实现方式:SpringAMQP帮我们实现了消息确认,并允许我们通过配置文件设置ACK处理方式,有三种模式:

none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回nack

如果是消息处理或校验异常,自动返回reject;

实现配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new RuntimeException("故意的");
    }
    log.info("消息处理完成");
}

实现效果:符合自动模式的预期结果。

失败重试机制

问题:当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。(往往的投递速度达到上千次/秒)

解决方案:Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。(和发送者重试机制类似)

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

实现效果如下:

消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

对于消息可靠性要求不高的业务场景下,这样就达到预期了。

失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由程序猿集中处理。(接下来演示这个

在consumer服务中创建一个ErrorMessageConfiguration配置类

定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

实现效果如下:

实现解读:实现了第三种方式,将失败后将消息投递到error.queue队列中。

注意:方式2比方式1好.(方式1是默认的,方式二的投递速度大概只有1次/几秒)

业务幂等性

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

对于新增,查询,删除来说都是具备业务幂等性的。

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。

在实际业务场景中,由于意外经常会出现业务被重复执行(服务间调用的重试,MQ消息的重复投递等等)

场景描述假设你在一个电子商务平台上购买了一件商品,并通过第三方支付网关完成了支付。支付成功后,支付系统向MQ发送一条消息通知交易服务进行订单状态更新(例如从“待支付”更新为“已支付”)。然而,由于网络不稳定或MQ系统的短暂故障,这条消息被重复投递了两次。

引发后果:

如果交易服务没有实现幂等性检查,在接收到第一条消息时,它会正常地将订单状态更新为“已支付”,并完成库存扣减和发货准备。

当第二条重复的消息到达时,由于缺乏幂等性控制,交易服务再次尝试执行相同的更新操作。这可能导致重复扣减库存,甚至更严重的是,如果支付流程涉及到直接与银行接口交互,则可能引发重复扣款的问题,给用户带来不必要的经济损失。

解决方案:

方案一:唯一消息ID(给消息id,使它具有辨识性)

方案二:业务状态判断(业务状态的改变的合理性)

方案一实现思路:

SpringAMQP的MessageConverter自带了MessageID的功能。

即我们可以在消息经过途径中加入消息id。(例如前面实现Jackson的消息转换器)

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

方案二实现思路:

基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

上述案例:处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

当然还有其他实现方案,例如:乐观锁机制,补偿机制等等。


http://www.kler.cn/a/568082.html

相关文章:

  • 【Vue教程】使用Vite快速搭建前端工程化项目 Vue3 Vite Node.js
  • 怎么写C#命令行参数程序,及控制台带参数案例(程序完整源码)下载
  • 【造个轮子】使用Golang实现简易令牌桶算法
  • 数据库测试
  • 蓝桥杯---归并排序算法题目(leetcode第912题)
  • 【SQL】MySQL中的字符串处理函数:concat 函数拼接字符串,COALESCE函数处理NULL字符串
  • 供应链管理系统--升鲜宝门店收银系统功能解析,登录、主界面、会员 UI 设计图(一)
  • 构建神经网络之常用pandas(补充中 )
  • JEEWMS departController.do存在SQL注入(DVB-2025-8837)
  • 【Python爬虫(93)】爬虫项目的安全防线:审计与合规攻略
  • Cocos Creator3.8.6拖拽物体的几种方式
  • java23种设计模式-备忘录模式
  • 本地部署阿里的万象2.1文生视频(Wan2.1-T2V-1.3B)模型
  • 【文献阅读】A Survey Of Resource-Efficient LLM And Multimodal Foundation Models
  • 前端开发核心知识点深度解析:从CSS到Vue的全面指南
  • 力扣hot100——回溯
  • DeepSeek 助力 Vue3 开发:打造丝滑的网格布局(Grid Layout)
  • Angular学习笔记90: 浏览器兼容性问题
  • 泛微e-office index.php sql注入漏洞复现(CNVD-2022-2)(附脚本)
  • 58、深度学习-自学之路-自己搭建深度学习框架-19、RNN神经网络梯度消失和爆炸的原因(从公式推导方向来说明),通过RNN的前向传播和反向传播公式来理解。