RabbitMQ消息相关
MQ的模式:
- 基本消息模式:一个生产者,一个消费者
- work模式:一个生产者,多个消费者
- 订阅模式:
fanout广播模式:在Fanout模式中,一条消息,会被所有订阅的队列都消费。
在广播模式下,消息发送流程:
- 可以有多个消费者
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
- 交换机把消息发送给绑定过的所有队列
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 每个队列都要绑定到Exchange(交换机)
- 每个消费者有自己的queue(队列)
direct定向模式:
在Direct下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
- topic通配符模式:
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。
单一模式;集群:普通模式(如果做了持久化,需要等待rabbit01恢复才可以被消费,如果没有做持久化,可能造成消息丢失),镜像模式
1、什么是RabbitMQ?为什么使用RabbitMQ?主要特性?
答:RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的,消息中间件;
可以用它来:解耦、异步、削峰添谷。
主要特性
- 可伸缩性:集群服务【主从模式的】
- 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存**【队列的持久化、交换机的持久化、消息持久化】**
1.1、解耦(为面向服务的架构(SOA)提供基本的最终一致性实现)
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
- 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
- 订单系统与库存系统耦合
引入消息队列
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
- 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
- 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
- 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。
基于消息的模型,关心的是“通知”,而非“处理”。
短信、邮件通知、缓存刷新等操作使用消息队列进行通知。
1.2、消息队列和RPC的区别与比较:
RPC: 异步调用,及时获得调用结果,具有强一致性结果,关心业务调用处理结果。
消息队列:两次异步RPC调用,将调用内容在队列中进行转储,并选择合适的时机进行投递(错峰流控)
异步提升效率
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种
1.串行的方式;2.并行方式
(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
(3)引入消息队列,将不是必须的业务逻辑,异步处理。
流量削峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。关于秒杀系列文章可以关注公众号Java技术栈获取阅读。
应用场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。
之前架构:大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的请求打到MySQL上,每秒钟预计执行3000条SQL。
但是一般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。
引入MQ:100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。
系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。
这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。
我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。
2、RabbitMQ有什么优缺点?
答:优点:解耦、异步、削峰;
缺点:降低了系统的稳定性:本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,系统也就崩溃了。因此,系统可用性会降低;
增加了系统的复杂性:加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
3、如何保证RabbitMQ的高可用?
答:搭建集群的RabbitMQ服务器提供服务,一台风险太大;
4、如何保证RabbitMQ不被重复消费?
答:先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;
但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
【即消费者发送 ACK 前崩溃,MQ 会重新投递消息,可能导致重复处理】
针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息幂等性;
比如:在写入消息队列的数据做唯一标示(比如放到redis中),消费消息时,根据唯一标识判断是否消费过;
ACK两种模式
自动 ACK(Auto-Acknowledgement)
- 定义:消费者收到消息后,MQ 立即自动标记消息为已确认(无论消费者是否处理成功)。
- 特点:
-
- 简单易用,但可靠性低。
- 若消费者处理消息时崩溃,消息会丢失(因为 MQ 已认为消息被确认)。
- 适用场景:允许少量消息丢失的非关键业务(如日志采集)。
手动 ACK(Manual Acknowledgement)
- 定义:消费者在处理消息后 显式发送 ACK,MQ 才会删除消息。
- 特点:
-
- 可靠性高,但需开发者主动控制。
- 若处理失败,可发送 NACK(Negative Acknowledgement)拒绝消息,触发重试或进入死信队列。
- 适用场景:金融交易、订单处理等对可靠性要求高的场景。
5、如何保证RabbitMQ消息的可靠传输(即保证消息不会丢失)?
答:消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息队列丢失消息、消费者丢失消息;
生产者丢失消息:【生产者发消息的可靠性】
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;
MQ就会发送一个ACK给生产者(包含消息的唯一ID),让生产者知道消息已经正确到达目的队列了;
如果MQ没能处理该消息,则会发送一个Nack消息给生产者,让生产者进行重试操作。
处理Ack和Nack的代码如下所示
rabbitmq:
host: 192.168.112.129
port: 5672
username: admin
password: pass
#确认消息已发送到交换机(Exchange)
#确认消息已发送到队列(Queue)
publisher-confirms: true
publisher-returns: true
消息队列丢数据:消息持久化。【消息队列数据的可靠性】
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。
这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?
这里顺便说一下吧,其实也很容易,就下面两步declare(/dɪˈkler/)
- 队列持久化:将queue的持久化标识durable(/ˈdʊrəbl/)设置为true,则代表是一个持久的队列
- 交换机持久化
- 消息持久化:发送消息的时候将delivery_mode=2,// 设置消息是否持久化,1: 非持久化 2:持久化
这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据
消费者丢失消息:【消费者消费数据的可靠性】
消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
如果这时处理消息失败,就会丢失该消息;
解决方案:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
交换机、队列、消息持久化方式总结:
为了提高并发能力,MQ的数据默认是在内存中存储的,包括交换机、队列、消息。
这样就会出现数据安全问题,如果服务宕机,存储在MQ中未被消费的消息都会丢失。
所以我们需要将交换机、队列、消息持久化到硬盘,以防服务宕机。
交换机持久化:
队列持久化:
消息持久化:
6、如何保证RabbitMQ消息的顺序性?
答:单线程消费保证消息的顺序性;对消息进行编号,消费者处理消息是根据编号处理消息;
把需要顺序性消费的消息放到同一个队列里,只允许一个消费者去消费这个队列,不需要保证顺序的就放到其他队列里。
7、如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
发送方确认模式
将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
接收方确认机制
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况
如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
消息积压处理办法:临时紧急扩容:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效:
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。
我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
如何保证消息的队列的顺序型?
在生产者发送消息给队列时,我们通过绑定多个队列,且每个队列都绑定唯一的消费者,消费者在消费队列中数据时是按顺序来的。
即生产者将消息存放到同一个队列,同一个消费者在读取该队列中的消息是按顺序读取的。
(消息重试【补充】机制)死信队列(Dead-Letter Queue, DLX)
一、死信队列的定义与作用
死信队列(DLX) 是消息队列系统中用于存储无法被正常消费的消息的特殊队列。这些消息被称为 死信(Dead-Letter Messages),通常是由于以下原因导致无法处理:
- 消息被消费者拒绝(Reject/Nack)且未重新入队。
- 消息过期(Time-To-Live, TTL 超时)。
- 队列达到最大长度限制,新消息被丢弃或旧消息被移除。
- 消息无法被正确路由到目标队列(如交换机绑定错误)。
核心作用:
- 容错处理:避免因消息处理失败导致的无限重试或丢失。
- 故障排查:集中管理异常消息,便于后续分析原因。
- 灵活路由:支持对死信消息的再处理(如人工干预或自动重试)。
二、消息成为死信的条件
触发条件 | 说明 |
消息被拒绝且不重入队 | 消费者调用 并设置 |
消息过期(TTL) | 消息或队列设置 TTL 超时后未被消费。 |
队列满 | 队列达到最大长度( 最大容量( |
路由失败 | 消息无法被交换机路由到任何队列 (需配置备用交换机 |
重试机制的设置
RabbitMQ自动补偿机制触发:(多用于调用第三方接口)
当我们的消费者在处理我们的消息的时候,程序抛出异常情况下触发自动补偿(默认无限次数重试)
应该对我们的消息重试设置间隔重试时间,
比如消费失败最多只能重试5次,间隔3秒(防止重复消费,幂等问题)
如果重试后还未消费默认丢弃,如果配置了死信队列,会发送到死信队列中
listener:
simple:
#手动签收
acknowledge-mode: manual
#并发消费者初始化值
concurrency: 10
#并发消费者的最大值
max-concurrency: 20
#每个消费者每次监听时可拉取处理的消息数量
prefetch: 5
retry:
#是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
enabled: true
#最大重试次数
max-attempts: 2
#重试间隔时间(单位毫秒)
initial-interval: 5000
#重试最大时间间隔(单位毫秒):当前时间间隔<max-interval(重试最大时间间隔)
max-interval: 1200000
#应用于前一重试间隔的乘法器:当前时间间隔=上次重试间隔*multiplier。
multiplier: 2
8、消息队列的使用场景
解耦系统、异步处理、削峰填谷
9、消息的重发,补充策略
一、消息重发机制
消息重发通常由消费者处理失败触发,分为 自动重试 和 手动重试 两种模式。
1. 自动重试策略
- 触发条件:
-
- 消费者抛出异常或返回失败状态。
- 消息处理超时(未在指定时间内完成)。
- 核心配置:
-
- 最大重试次数:限制重试次数,避免无限循环(如3次)。
- 重试间隔:逐步增加等待时间(如指数退避)。
- 重试队列隔离:将重试消息路由到独立队列,避免阻塞正常消费。
2. 手动重试策略
- 适用场景:需要人工介入的复杂错误(如数据修复后重试)。
- 实现方式:
-
- 将失败消息持久化到数据库或死信队列。
- 提供管理界面手动触发重试。
二、补充策略
当自动重试无法解决问题时,需结合补充策略确保最终处理。
1. 死信队列(DLX)兜底
- 功能:收集所有重试失败的消息,避免消息丢失。
- 处理方式:
-
- 监控告警:检测死信队列堆积,通知运维人员。
- 人工处理:修复数据或配置后,手动重新投递消息。
- 自动补偿:通过定时任务扫描死信队列,重新投递(需限制频率)。
2. 异步补偿任务
- 场景:消息彻底无法处理时,启动补偿流程。
- 实现:
- 定时任务扫描:检查消息处理状态,重新投递或回滚业务。
- 事务反向操作:如订单支付失败后,补偿释放库存。
3. 幂等性设计
- 必要性:重发可能导致消息重复,需确保业务逻辑幂等。
- 实现方式:
- 唯一标识:为消息分配全局唯一ID(如UUID),处理前检查状态。
- 数据库约束:利用唯一索引或乐观锁防止重复提交。
- 状态机:限制业务操作的状态流转路径(如订单只能支付一次)。
四、最佳实践
- 分级重试策略:
-
- 首次失败立即重试(网络抖动)。
- 后续重试逐步增加间隔(指数退避)。
- 超过阈值转入死信队列。
- 重试次数监控:
-
- 记录消息重试次数和原因,便于分析系统瓶颈。
- 隔离重试流量:
-
- 使用独立队列和消费者组处理重试消息,避免影响正常流量。
- 熔断机制:
-
- 若某类消息持续失败(如依赖服务宕机),暂时停止消费并触发告警。
- 自动化测试:
-
- 模拟消息重发场景,验证幂等性和补偿逻辑的正确性。
五、总结【重点看总结】
消息重发与补充策略的核心目标是 平衡可靠性与系统复杂性:
- 自动重试:解决临时性故障(如网络抖动)。
- 死信队列:兜底无法自动修复的异常。
- 补偿机制:最终确保业务状态一致。
- 幂等性:防御重试导致的重复操作。
待完善