当前位置: 首页 > article >正文

【RabbitMQ】应用问题

RabbitMQ 应用问题

  • 1. 幂等性保障
    • 1.1 幂等性介绍
    • 1.2 解决⽅案
      • 全局唯⼀ID
      • 业务逻辑判断
  • 2. 顺序性保障
    • 2.1 顺序性保障介绍
    • 2.2 顺序性保障⽅案
  • 3. 消息积压问题
    • 3.1 原因分析
    • 3.2 解决⽅案

1. 幂等性保障

1.1 幂等性介绍

幂等性是数学和计算机科学中某些运算的性质, 它们可以被多次应⽤, ⽽不会改变初始应⽤的结果.

应⽤程序的幂等性介绍

在应⽤程序中, 幂等性就是指对⼀个系统进⾏重复调⽤(相同参数), 不论请求多少次, 这些请求对系统的影响都是相同的效果.

⽐如数据库的 select 操作. 不同时间两次查询的结果可能不同, 但是这个操作是符合幂等性的. 幂等性指的是对资源的影响, ⽽不是返回结果. 查询操作对数据资源本⾝不会产⽣影响, 之所以结果不同, 可能是因为两次查询之间有其他操作对资源进⾏了修改.

⽐如 i++ 这个操作, 就是⾮幂等性的. 如果调⽤⽅没有控制好逻辑, ⼀次流程重复调⽤好⼏次, 结果就会不同.

MQ的幂等性介绍

对于MQ⽽⾔, 幂等性是指同⼀条消息, 多次消费, 对系统的影响是相同的.

⼀般消息中间件的消息传输保障分为三个层级

  1. At most once:最多⼀次. 消息可能会丢失,但绝不会重复传输
  2. At least once:最少⼀次. 消息绝不会丢失,但可能会重复传输.
  3. Exactly once:恰好⼀次. 每条消息肯定会被传输⼀次且仅传输⼀次.

RabbitMQ⽀持"最多⼀次"和"最少⼀次".

对于"恰好⼀次", ⽬前RabbitMQ还做不到, 不仅是RabbitMQ, ⽬前市⾯上主流的消息中间件, 都做不到这⼀点.

在业务使⽤中, 对于可靠性要求⽐较⾼的场景, 建议使⽤"最少⼀次", 以防⽌消息丢失. “最多⼀次” 会因为消息发送过程中, ⽹络问题, 消费出现异常等种种原因, 导致消息丢失.

以下场景可能会导致消息发送重复(包含但不限于)
发送时消息重复: 当⼀条消息已被成功发送到服务端并完成持久化, 此时出现了⽹络闪断或者客⼾端宕机, 导致服务端对客⼾端应答失败. 如果此时Producer意识到消息发送失败并尝试再次发送消息, Consumer后续会收到两条内容相同并且Message ID也相同的消息.
投递时消息重复: 消息消费的场景下, 消息已投递到Consumer并完成业务处理, 当客⼾端给服务端反馈应答的时候⽹络闪断. 为了保证消息⾄少被消费⼀次, 云消息队列 RabbitMQ 版的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息, Consumer后续会收到两条内容相同并且Message ID也相同的消息.
在这里插入图片描述
但是"最少⼀次", 就会造成⼀个问题, 消费端会收到重复的消息, 也会造成对同⼀条消息进⾏多次处理. ⼀些不重要的业务还好⼀点, 对于重要的业务, 如果不对重复的消息进⾏处理, 会造成严重事故.

⽐如: 当⽤⼾对⼀个订单付款之后, 因为⽹络问题, 付款成功的结果未返回给订单系统, 当⽤⼾再次点击付款时, 如果系统未做幂等性处理, 那就会造成两次扣款

1.2 解决⽅案

MQ消费者的幂等性的解决⽅法, ⼀般有以下⼏种:

全局唯⼀ID

  1. 为每条消息分配⼀个唯⼀标识符, ⽐如UUID或者MQ消息中的唯⼀ID,但是⼀定要保证唯⼀性.
  2. 消费者收到消息后, 先⽤该id判断该消息是否已经消费过, 如果已经消费过, 则放弃处理
  3. 如果未消费过, 消费者开始消费消息, 业务处理成功后, 把唯⼀ID保存起来(数据库或Redis等)

可以使⽤Redis 的原⼦性操作setnx来保证幂等性, 将唯⼀ID作为key放到redis中 (SETNX messageID 1) . 返回1, 说明之前没有消费过, 正常消费. 返回0, 说明这条消息之前已消费过, 抛弃.

业务逻辑判断

在业务逻辑层⾯实现消息处理的幂等性.

例如: 通过检查数据库中是否已存在相关数据记录, 或者使⽤乐观锁机制来避免更新已被其他事务更改的数据, 再或者在处理消息之前, 先检查相关业务的状态, 确保消息对应的操作尚未执⾏, 然后才进⾏处理, 具体根据业务场景来处理

2. 顺序性保障

2.1 顺序性保障介绍

消息的顺序性是指消费者消费的消息和⽣产者发送消息的顺序是⼀致的.

⽐如⽣产者发送的消息分别是msg1, msg2, msg3, 那么消费者也是按照msg1, msg2, msg3的顺序进⾏消费的.

很多业务场景下, 消息的消费是不⽤保证顺序的, ⽐如使⽤MQ实现订单超时的处理. 但有些业务场景, 可能存在多个消息顺序处理的情况. ⽐如⽤⼾信息修改, 对同⼀个⽤⼾的同⼀个资料进⾏修改, 需要保证消息的顺序.

⼀些资料显⽰RabbitMQ的消息能够保障顺序性, 这是不严谨的. 在不考虑消息丢失, ⽹络故障等异常的情况下, 如果只有⼀个消费者, 最好也只有⼀个⽣产者的情况下, 是可以保证消息的顺序性. 如果有多个⽣产者同时发送消息, ⽆法确定消息到达RabbitMQ Broker的前后顺序, 也就⽆法验证消息的顺序性.

哪些情况可能会打破RabbitMQ的顺序性呢? 下⾯介绍⼏种常⻅的场景:

  1. 多个消费者: 当队列配置了多个消费者时, 消息可能会被不同的消费者并⾏处理, 从⽽导致消息处理的顺序性⽆法保证.
  2. ⽹络波动或异常: 在消息传递过程中, 如果出现⽹络波动或异常, 可能会导致消息确认(ACK) 丢失, 从⽽使得消息被重新⼊队和重新消费, 造成顺序性问题.
  3. 消息重试:如果消费者在处理消息后未能及时发送确认, 或者确认消息在传输过程中丢失, 那么MQ可能会认为消息未被成功消费⽽进⾏重试, 这也可能导致消息处理的顺序性问题.
  4. 消息路由问题: 在复杂的路由场景中, 消息可能会根据路由键被发送到不同的队列, 从⽽⽆法保证全局的顺序性.
  5. 死信队列: 消息因为某些原因(如消费端拒绝消息)被放⼊死信队列, 死信队列被消费时, ⽆法保证消息的顺序和⽣产者发送消息的顺序⼀致

包括但不仅限于以上⼏种情形会使RabbitMQ消息错序, 如果要保证消息的顺序性, 需要业务⽅使⽤RabbitMQ之后做进⼀步的处理

2.2 顺序性保障⽅案

消息顺序性保障分为: 局部顺序性保证和全局顺序性保证.

局部顺序性通常指的是在单个队列内部保证消息的顺序. 全局顺序性是指在多个队列或多个消费者之间保证消息的顺序.

在实际应⽤中, 全局顺序性很难实现, 可以考虑使⽤业务逻辑来保证顺序性, ⽐如在消息中嵌⼊序列号,并在消费端进⾏排序处理. 相对⽽⾔, 局部顺序性更常⻅, 也更容易实现

RabbitMQ作为⼀个分布式消息队列, 主要优化的是吞吐量和可⽤性, ⽽不是严格的顺序性保证. 如果业务场景确实需要严格的消息顺序, 可能需要在应⽤层⾯进⾏额外的设计和实现.

接下来说⼀下消息的顺序性保证的常⻅策略.

  1. 单队列单消费者

最简单的⽅法是使⽤单个队列, 并由单个消费者进⾏处理. 同⼀个队列中的消息是先进先出的, 这是RabbitMQ来帮助我们保证的

  1. 分区消费

单个消费者的吞吐太低了, 当需要多个消费者以提⾼处理速度时, 可以使⽤分区消费. 把⼀个队列分割成多个分区, 每个分区由⼀个消费者处理, 以此来保持每个分区内消息的顺序性

⽐如⽤⼾修改资料后, 发送⼀条⽤⼾资料消息. 消费者在处理时, 需要保证消息发送的先后顺序但这种场合并不需要保证全局顺序. 只需要保证同⼀个⽤⼾的消息顺序消费就可以. 这时候就可以采⽤把消费按照⼀定的规则, 分为多个区, 每个分区由⼀个消费者处理.
RabbitMQ本⾝并不⽀持分区消费, 需要业务逻辑去实现, 或者借助spring-cloud-stream来实现

  1. 消息确认机制

使⽤⼿动消息确认机制, 消费者在处理完⼀条消息后, 显式地发送确认, 这样RabbitMQ才会移除并继续发送下⼀条消息.

  1. 业务逻辑控制

在某些情况下, 即使消息乱序到达, 也可以在业务逻辑层⾯实现顺序控制. ⽐如通过在消息中嵌⼊序列号, 并在消费时根据这些信息来处理

RabbitMQ本⾝并不保证全局的严格顺序性, 特别是在分布式系统中. 在实际应⽤开发中, 根据具体的业务需求, 可能需要结合多种策略来实现所需要的顺序保证.

3. 消息积压问题

3.1 原因分析

消息积压是指在消息队列(如RabbitMQ)中, 待处理的消息数量超过了消费者处理能⼒, 导致消息在队列中不断堆积的现象.

通常有以下⼏种原因:

  1. 消息⽣产过快: 在⾼流量或者⾼负载的情况下, ⽣产者以极⾼的速率发送消息, 超过了消费者的处理能⼒.
  2. 消费者处理能⼒不⾜: 消费者处理处理消息的速度跟不上消息⽣产的速度, 也会导致消息在队列中积压
    可能原因有:
    1. 消费端业务逻辑复杂, 耗时⻓
    2. 消费端代码性能低
    3. 系统资源限制, 如CPU、内存、磁盘I/O等也会限制消费者处理消息的效率.
    4. 异常处理处理不当. 消费者在处理消息时出现异常, 导致消息⽆法被正确处理和确认.
  3. ⽹络问题: 因为⽹络延迟或不稳定, 消费者⽆法及时接收或确认消息, 最终导致消息积压
  4. RabbitMQ 服务器配置偏低

消息积压可能会导致系统性能下降, 影响⽤⼾体验, 甚⾄导致系统崩溃. 因此, 及时发现消息积压并解决对于维护系统稳定性⾄关重要.

3.2 解决⽅案

遇到消息积压时, ⾸先要分析消息积压造成的原因. 根据原因来调整策略

主要从以下⼏个⽅⾯来解决:

  1. 提⾼消费者效率

    a. 增加消费者实例数量, ⽐如新增机器
    b. 优化业务逻辑, ⽐如使⽤多线程来处理业务
    c. 设置prefetchCount, 当⼀个消费者阻塞时, 消息转发到其他未阻塞的消费者.
    d. 消息发⽣异常时, 设置合适的重试策略, 或者转⼊到死信队列

  2. 限制⽣产者速率. ⽐如流量控制, 限流算法等

    a. 流量控制: 在消息⽣产者中实现流量控制逻辑, 根据消费者处理能⼒动态调整发送速率
    b. 限流: 使⽤限流⼯具, 为消息发送速率设置⼀个上限
    c. 设置过期时间. 如果消息过期未消费, 可以配置死信队列, 以避免消息丢失, 并减少对主队列的压⼒

  3. 资源与配置优化. ⽐如升级RabbitMQ服务器的硬件, 调整RabbitMQ的配置参数等


http://www.kler.cn/a/316008.html

相关文章:

  • JVM实战—12.OOM的定位和解决
  • Ubuntu问题 -- 硬盘存储不够了, 如何挂载一个新的硬盘上去, 图文简单明了, 已操作成功
  • pytest 参数介绍
  • 【入门级】计算机网络学习
  • 简聊MySQL的顺序读写和随机读写
  • 高通,联发科(MTK)等手机平台调优汇总
  • 【c++】介绍
  • 数据结构与算法——Java实现 11.习题——有序链表去重
  • 深度优先搜索算法及其matlab程序详解
  • [大语言模型-论文精读] 以《黑神话:悟空》为研究案例探讨VLMs能否玩动作角色扮演游戏?
  • ubuntu+MobaXterm+ssh+运行Qt(成功版)
  • Zotero(7.0.5)+123云盘同步空间+Z-library=无限存储文献pdf/epub电子书等资料
  • 【C++驾轻就熟】模板
  • JVM的CMS、G1以及ZGC对比
  • RS®AREG100A 汽车电子雷达回波发生器
  • 后端-项目创建与sql
  • Request 跨线程访问问题
  • 屋顶气膜网球馆:智慧城市资源利用之道—轻空间
  • STM32 的 SDIO 接口(基于STM32F429HAL库)
  • 考题抄错会做也白搭——模板方法模式
  • h5dump用法详解
  • 乐观锁、悲观锁及死锁
  • 【机器学习】---神经架构搜索(NAS)
  • 【tomcat】tomcat学习笔记
  • 垃圾邮件检测_TF-IDF分析,聚类分析与朴素贝叶斯
  • spring springboot 日志框架