【RabbitMQ应用篇】常见应用问题
1. 消息幂等性保障
1.1 幂等性介绍
幂等性:这个概念在数学和计算机领域中相当常见,表示可以被应用多次但是不会改变初始应用结果的性质。
应用程序的幂等性:指的是在一个应用系统中,重复调用多次请求(相同参数),对应用系统的影响是相同的。比如说在数据库应用系统当中,使用select
操作就是保证幂等的,这里指的是对于系统资源的影响而不是返回的结果,即select操作多次对于系统资源不会产生影响,但是返回结果不一致则是由于其他操作导致的。
MQ的消息幂等性:对于MQ而言,幂等性指的就是同一条消息被多次消费,对系统的影响是相同的
一般来说消息中间件产品的消息传输保障分为以下三个层级:
- At most once:最多一次,消息可能会丢失,但是绝对不会重复传输
- At least once:最少一次,消息不可能丢失,但是可能会重复传输多次
- Exactly once:恰好一次,每条消息肯定会且仅会传输一次
当前RabbitMQ可以支持"最多一次"、“最少一次”,但是很难做到"恰好一次"(别的主流MQ产品也很难做到)通常在业务使用过程中,对于消息可靠性要求较高的建议使用"最少一次",如果消息可靠性要求不高,可以考虑使用"最多一次"
1.2 最少一次导致的幂等性问题
在当前的消息架构图当中,如果采用"最少一次"的方式保证消息的可靠性,就会存在以下两种情况导致消息重复:
- 当Producer成功将消息投递给Broker并完成持久化之后,Broker根据生产者确认机制会调用
confirmCallBack
,此时由于网络问题导致消息没有成功传输给Producer,此时Producer会尝试重发消息 - 当Broker成功将消息投递给对应的Consumer时,根据消息确认机制,Consumer会手动确认basicAck,但是由于网络等问题导致Ack信息没有正确到达Broker,超过指定时间阈值后Broker会重新投递消息
上述两种情况都会出现消费者重复消费同一条信息多次的问题,如果是一个付款业务,如果不保证幂等性就会造成重复扣款,因此针对特殊的业务场景我们需要保证消息的幂等性
1.3 解决方案
MQ当中消息幂等性的解决方案通常有以下几种:
1.3.1 全局唯一ID
- 为每一条消息分配一个唯一标识符,比如UUID、全局自增ID、时间戳+ID,总之一定要保证唯一性
- 当消费者收到消息之后,先用id判断该消息是否被消费过,如果已经消费过则不进行处理
- 如果未消费过,消费者则进行相应业务逻辑处理,处理完毕后将该消息ID保存到数据库中(MySQL或者redis)
以redis为例,我们可以用setnx命令保证幂等性,发送setnx messageID 1,如果setnx返回为0证明该消息已经被消费过则不处理,如果返回为1则正常消费!
1.3.2 业务逻辑判断
此方式就是从实际业务背景出发实现消息的幂等性,例如说检查数据库是否已经存在相应记录,或者使用乐观锁机制避免更新被其他事务修改的数据。
比如在一个支付系统中,收到多个支付订单的请求,则根据业务逻辑查询对应的订单状态,如果是"已支付"状态,则无需重复支付处理!
2. 消息顺序性保障
2.1 顺序性介绍
消息顺序性:指的就是生产者发送消息的顺序和消费者消费消息的顺序是否保持一致的特性
比如生产者发送的消息顺序分别是:msg1、msg2、msg3,那么如果消费者消费消息的顺序也是msg1、msg2、msg3,则说明是实现了顺序性保证的,在很多业务场景中无需保证消息顺序性,比如订单超时取消,但是有些场景需要保证消息顺序性:比如个人信息修改:
如果此时消费者消费消息的顺序为1. 修改姓名为米饭不好吃,2. 修改姓名为米饭好好吃,此时就出现问题了!下面会打乱RabbitMQ消息顺序性的典型场景:
2.2 影响消息顺序性的因素
在RabbitMQ当中,如果只有单个队列、单个消费者的情况,确实能保证消息的顺序性,但是以下场景会破坏消息顺序性:
- 配置了多个消费者:此时某个队列中的消息会被多个消费者并行处理,导致消息处理顺序不可控
- 消息路由问题:在复杂的路由场景中,可能某个消息会路由到不同的队列中,导致消息被消费的顺序不可控
- 网络波动异常:当消息确认的Ack因为网络原因丢失之后,配置消息重新入队、重新投递此时可能会导致消息的乱序到达
- 消息重试机制:与网络波动原因类似,在配置消息重试机制之后,当某个消息传输出现异常后,根据重试机制重新投递,造成消息乱序到达
- 死信队列:某些消息因为特殊原因被消费者拒绝而放入死信队列,死信队列被消费时无法保证消息的顺序性
2.3 解决方案
消息的顺序性保证分为全局顺序性保证和局部顺序性保证:
局部顺序性保证指的就是在单个队列内部保证消息的顺序,全局顺序性保证就是指在多个队列、多个消费者之间保证消息的顺序性,由于RabbitMQ是一款分布式消息队列产品,提高系统的吞吐量和可用性是其重点,如果针对某种特定业务场景确实需要保证全局顺序性,可以引入业务逻辑进行处理,此处主要针对局部消息顺序性保证:
- 方案一:转化为单队列、单消费者模型,这是最简单的一种方式,RabbitMQ内部已经实现好了
- 方案二:分区消费,如果一定需要多消费者提高系统吞吐量,此时我们可以使用分区消费,即将一个队列划分为多个分区,每个分区由一个消费者进行处理,以此来保证每个分区内消息的顺序性,可以借助Spring-Cloud-Stream实现
- 方案三:消息确认机制,使用手动确认机制,只有当上一条消息消费后显式发送确认,才会发送下一条消息
- 方案四:业务逻辑控制:比如在消息中嵌入全局序列号,及时消息乱序到达也能在消费者端根据一定的业务逻辑进行排序消费
3. 消息积压问题
3.1 原因分析
消息积压:指的是在RabbitMQ当中,待处理的消息数量超过了消费者的消费能力,导致消息不断积压在队列当中
通常有以下原因:
- 消息生产过快:比如在特定的时间节点比如双十一,生产者会以极高的速率生产消息
- 消费者消费能力不足:有以下几种情况
- 消费端消费逻辑复杂,耗时久
- 消费端代码性能低
- 系统资源限制,比如CPU、网络带宽限制了消费者的消息速率
- 异常处理不当,消费者在处理消息异常的过程中没有进行消息确认
- 网络问题:由于网络的波动导致消费者无法及时接收和确认消息,最终导致消息积压
- RabbitMQ服务器性能配置低下
3.2 解决方案
在遇到消息积压问题后我们需要根据产生的原因对症下药:
- 提高消费者效率:
- 增加消费者机器实例数
- 优化业务逻辑处理过程,比如使用多线程优化性能
- 当消息发生异常时,转入死信队列或者配置合适的重试策略
- 设置prefetchCount,当某个消费者阻塞则转移到其他消费者
- 降低生产者效率:
- 进行流量控制:根据消费者消费能力动态调整发送速率
- 限流:使用限流工具设置发送速率上限
- 设置过期时间:如果某个消息长时间没有被处理就转入死信队列
- 资源和优化配置:提升RabbitMQ服务器性能,提升网络带宽