当前位置: 首页 > article >正文

RabbitMQ的应用问题

一、幂等性保障

幂等性是数学和计算机科学中某些运算的性质, 它们可以被多次应⽤, ⽽不会改变初始应⽤的结果

数学上的幂等性:

f(x)=f(f(x))   |x|

数据库操作幂等性:

       数据库的 select 操作. 不同时间两次查询的结果可能不同, 但是这个操作是符合幂等性的. 幂等性指的是对资源的影响, ⽽不是返回结果. 查询操作对数据资源本⾝不会产⽣影响, 之所以结果不同, 可能是因为两次查询之间有其他操作对资源进⾏了修改. 

应用上的幂等性:

       在应⽤程序中, 幂等性就是指对⼀个系统进⾏重复调⽤(相同参数), 不论请求多少次, 这些请求对系统的影响都是相同的效果

非幂等性:⽐如  i++ 这个操作, 就是⾮幂等性的. 如果调⽤⽅没有控制好逻辑, ⼀次流程重复调⽤好⼏次, 结果就会不同.

MQ的幂等性:

       对于MQ⽽⾔, 幂等性是指同⼀条消息, 多次消费, 对系统的影响是相同的.⼀般消息中间件的消息传输保障分为三个层级.
1. At most once:最多⼀次. 消息可能会丢失,但绝不会重复传输.
2. At least once:最少⼀次. 消息绝不会丢失,但可能会重复传输.
3. Exactly once:恰好⼀次. 每条消息肯定会被传输⼀次且仅传输⼀次.
RabbitMQ⽀持"最多⼀次"和"最少⼀次".
对于"恰好⼀次", ⽬前RabbitMQ还做不到, 不仅是RabbitMQ, ⽬前市⾯上主流的消息中间件, 都做不到这⼀点.

对于可靠性要求⽐较⾼的场景, 建议使⽤"最少⼀次", 以防⽌消息丢失. "最多⼀次" 会因为消息发送过程中, ⽹络问题, 消费出现异常等种种原因, 导致消息丢失.
以下场景可能会导致消息发送重复(包含但不限于)
 ①发送时消息重复: 当⼀条消息已被成功发送到服务端并完成持久化, 此时出现了⽹络闪断或者客⼾端宕机, 导致服务端对客⼾端应答失败. 如果此时Producer意识到消息发送失败并尝试再次发送消息, Consumer后续会收到两条内容相同并且Message ID也相同的消息.
② 投递时消息重复: 消息消费的场景下, 消息已投递到Consumer并完成业务处理, 当客⼾端给服务端反馈应答的时候⽹络闪断. 为了保证消息⾄少被消费⼀次, 云消息队列 RabbitMQ 版的服务端在将⽹络恢复后再次尝试投递之前已被处理过的消息, Consumer后续会收到两条内容相同并且Message ID也相同的消息.

但是"最少⼀次", 就会造成⼀个问题, 消费端会收到重复的消息, 也会造成对同⼀条消息进⾏多次处理. ⼀些不重要的业务还好⼀点, 对于重要的业务, 如果不对重复的消息进⾏处理, 会造成严重事故

MQ消费者的幂等性的解决⽅法, ⼀般有以下⼏种:

全局唯⼀ID
1. 为每条消息分配⼀个唯⼀标识符, ⽐如UUID或者MQ消息中的唯⼀ID,但是⼀定要保证唯⼀性.
2. 消费者收到消息后, 先⽤该id判断该消息是否已经消费过, 如果已经消费过, 则放弃处理.
3. 如果未消费过, 消费者开始消费消息, 业务处理成功后, 把唯⼀ID保存起来(数据库或Redis等)
可以使⽤Redis 的原⼦性操作setnx来保证幂等性, 将唯⼀ID作为key放到redis中 (SETNX
messageID 1) . 返回1, 说明之前没有消费过, 正常消费. 返回0, 说明这条消息之前已消费过, 抛
弃.

业务逻辑判断
在业务逻辑层⾯实现消息处理的幂等性.
例如: 通过检查数据库中是否已存在相关数据记录, 或者使⽤乐观锁机制来避免更新已被其他事务更改的数据, 再或者在处理消息之前, 先检查相关业务的状态, 确保消息对应的操作尚未执⾏, 然后才进⾏处理, 具体根据业务场景来处理

二、顺序性保障

消息的顺序性是指消费者消费的消息和⽣产者发送消息的顺序是⼀致的.(⽐如⽣产者发送的消息分别是msg1, msg2, msg3, 那么消费者也是按照msg1, msg2, msg3的顺序进⾏消费的.)

很多业务场景下, 消息的消费是不⽤保证顺序的, ⽐如使⽤MQ实现订单超时的处理. 但有些业务场景, 可能存在多个消息顺序处理的情况. ⽐如⽤⼾信息修改, 对同⼀个⽤⼾的同⼀个资料进⾏修改, 需要保证消息的顺序.

⼀些资料显⽰RabbitMQ的消息能够保障顺序性, 这是不严谨的. 在不考虑消息丢失, ⽹络故障等异常的情况下, 如果只有⼀个消费者, 最好也只有⼀个⽣产者的情况下, 是可以保证消息的顺序性. 如果有多个⽣产者同时发送消息, ⽆法确定消息到达RabbitMQ Broker的前后顺序, 也就⽆法验证消息的顺序性.哪些情况可能会打破RabbitMQ的顺序性呢? 下⾯介绍⼏种常⻅的场景:
1. 多个消费者: 当队列配置了多个消费者时, 消息可能会被不同的消费者并⾏处理, 从⽽导致消息处理的顺序性⽆法保证.
2. ⽹络波动或异常: 在消息传递过程中, 如果出现⽹络波动或异常, 可能会导致消息确认(ACK) 丢失, 从⽽使得消息被重新⼊队和重新消费, 造成顺序性问题.
3. 消息重试:如果消费者在处理消息后未能及时发送确认, 或者确认消息在传输过程中丢失, 那么MQ可能会认为消息未被成功消费⽽进⾏重试, 这也可能导致消息处理的顺序性问题.
4. 消息路由问题: 在复杂的路由场景中, 消息可能会根据路由键被发送到不同的队列, 从⽽⽆法保证全局的顺序性.
5. 死信队列: 消息因为某些原因(如消费端拒绝消息)被放⼊死信队列, 死信队列被消费时, ⽆法保证消息的顺序和⽣产者发送消息的顺序⼀致

顺序保障方案:

消息顺序性保障分为: 局部顺序性保证和全局顺序性保证.

局部顺序性通常指的是在单个队列内部保证消息的顺序. 全局顺序性是指在多个队列或多个消费者之间保证消息的顺序.

注:①在实际应⽤中, 全局顺序性很难实现, 可以考虑使⽤业务逻辑来保证顺序性, ⽐如在消息中嵌⼊序列号,并在消费端进⾏排序处理. 相对⽽⾔, 局部顺序性更常⻅, 也更容易实现.
②RabbitMQ作为⼀个分布式消息队列, 主要优化的是吞吐量和可⽤性, ⽽不是严格的顺序性保证.如 果业务场景确实需要严格的消息顺序, 可能需要在应⽤层⾯进⾏额外的设计和实现

顺序性保证的常⻅策略

1. 单队列单消费者
最简单的⽅法是使⽤单个队列, 并由单个消费者进⾏处理. 同⼀个队列中的消息是先进先出的, 这是
RabbitMQ来帮助我们保证的.
2. 分区消费
单个消费者的吞吐太低了, 当需要多个消费者以提⾼处理速度时, 可以使⽤分区消费. 把⼀个队列分割成多个分区, 每个分区由⼀个消费者处理, 以此来保持每个分区内消息的顺序性(⽐如⽤⼾修改资料后, 发送⼀条⽤⼾资料消息. 消费者在处理时, 需要保证消息发送的先后顺序,但这种场合并不需要保证全局顺序. 只需要保证同⼀个⽤⼾的消息顺序消费就可以. 这时候就可以采⽤把消费按照⼀定的规则, 分为多个区, 每个分区由⼀个消费者处理
RabbitMQ本⾝并不⽀持分区消费, 需要业务逻辑去实现, 或者借助spring-cloud-stream来实现)    3. 消息确认机制
使⽤⼿动消息确认机制, 消费者在处理完⼀条消息后, 显式地发送确认, 这样RabbitMQ才会移除并继续发送下⼀条消息.
4. 业务逻辑控制
在某些情况下, 即使消息乱序到达, 也可以在业务逻辑层⾯实现顺序控制. ⽐如通过在消息中嵌⼊序列号, 并在消费时根据这些信息来处理

三、消息挤压问题

原因分析
消息积压是指在消息队列(如RabbitMQ)中, 待处理的消息数量超过了消费者处理能⼒, 导致消息在队列中不断堆积的现象.

通常有以下⼏种原因:
1. 消息⽣产过快: 在⾼流量或者⾼负载的情况下, ⽣产者以极⾼的速率发送消息, 超过了消费者的处理能⼒.
2. 消费者处理能⼒不⾜: 消费者处理处理消息的速度跟不上消息⽣产的速度, 也会导致消息在队列中积压.
可能原因有:
1) 消费端业务逻辑复杂, 耗时⻓
2) 消费端代码性能低
3) 系统资源限制, 如CPU、内存、磁盘I/O等也会限制消费者处理消息的效率.
4) 异常处理处理不当. 消费者在处理消息时出现异常, 导致消息⽆法被正确处理和确认.
3. ⽹络问题: 因为⽹络延迟或不稳定, 消费者⽆法及时接收或确认消息, 最终导致消息积压
4. RabbitMQ 服务器配置偏低
消息积压可能会导致系统性能下降, 影响⽤⼾体验, 甚⾄导致系统崩溃. 因此, 及时发现消息积压并解决对于维护系统稳定性⾄关重要.

解决⽅案
遇到消息积压时, ⾸先要分析消息积压造成的原因. 根据原因来调整策略.
主要从以下⼏个⽅⾯来解决:
1. 提⾼消费者效率
a. 增加消费者实例数量, ⽐如新增机器
b. 优化业务逻辑, ⽐如使⽤多线程来处理业务
c. 设置prefetchCount, 当⼀个消费者阻塞时, 消息转发到其他未阻塞的消费者.
d. 消息发⽣异常时, 设置合适的重试策略, 或者转⼊到死信队列
2. 限制⽣产者速率. ⽐如流量控制, 限流算法等.
a. 流量控制: 在消息⽣产者中实现流量控制逻辑, 根据消费者处理能⼒动态调整发送速率
b. 限流: 使⽤限流⼯具, 为消息发送速率设置⼀个上限
c. 设置过期时间. 如果消息过期未消费, 可以配置死信队列, 以避免消息丢失, 并减少对主队列的压

3. 资源与配置优化. ⽐如升级RabbitMQ服务器的硬件, 调整RabbitMQ的配置参数等


http://www.kler.cn/news/327287.html

相关文章:

  • ansible学习之 Facts
  • Python知识点:如何使用EdgeX Foundry与Python进行边缘计算
  • 使用iTextPDF库时,设置文字为中文格式
  • 基于微信小程序的美食推荐系统
  • 鸿蒙NEXT入门到实战(基于最新api12稳定版)
  • DevExpress WinForms中文教程:Data Grid - 如何添加或删除行?
  • 基于springboot的评分评教管理系统
  • C#进阶-读写Excel常用框架及其使用方式
  • Edge SCDN:安全与速度并进的解决方案
  • C嘎嘎入门篇:类和对象(2)
  • JVM运行情况预估
  • 分库分表还是分布式?如何用 OceanBase的单机分布式一体化从根本上解决问题
  • 从Elasticsearch到RedisSearch:探索更快的搜索引擎解决方案
  • 回归预测|基于小龙虾优化LightGBM的数据回归预测Matlab程序COA-LightGBM 多特征输入单输出 含基础模型
  • SQL Server 分页查询的学习文章
  • 通信工程学习:什么是CSMA/CA载波监听多路访问/冲突避免
  • sql server连接池爆满排查解决定位
  • 【JavaEE】——多线程常用类和常用数据结构(精华长文)
  • 【NTN 卫星通信】基于NR的NTN RAN架构
  • 【Orange Pi 5嵌入式应用编程】-用户空间UART通信
  • 相亲交友系统的社会影响:家庭结构的变化
  • TFTP协议
  • linux中使用docker命令时提示权限不足
  • 十七、触发器
  • 拿下奇怪的前端报错:某些多摄手机拉取部分摄像头视频流会导致应用崩溃,该如何改善呢?
  • 调用智谱AI,面试小助手Flask简单示例
  • 群面技巧|无领导小组讨论发言技巧|无领导小组讨论角色|无领导小组讨论提问|秋招
  • 【Unity踩坑】使用内购时获取Google Play license key
  • STM32 OLED
  • Java8 IntStream流sum的Bug