RabbitMQ-消息可靠性以及延迟消息
目录
消息丢失
一、发送者的可靠性
1.1 生产者重试机制
1.2 生产者确认机制
1.3 实现生产者确认
(1)开启生产者确认
(2)定义ReturnCallback
(3)定义ConfirmCallback
二、MQ的持久化
2.1 数据持久化
2.2 LazyQueue
2.2.1 控制台配置Lazy模式
2.2.2 代码配置Lazy模式
2.2.3 更新已有队列为lazy模式
三、消费者的可靠性
3.1 消费者确认机制
3.2 失败重试机制
3.3 失败处理策略
3.4 业务幂等性
3.4.1 唯一消息ID
3.4.2 业务判断
四、延迟消息
4.1 死信交换机
4.2 DelayExchange(延迟消息插件)
4.2.1 声明延迟交换机
4.2.2 发送延迟消息
4.3 超时订单问题
消息丢失
消息从生产者到消费者的每一步都可能导致消息丢失:
-
发送消息时丢失:
-
生产者发送消息时连接MQ失败
-
生产者发送消息到达MQ后未找到
Exchange
-
生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
-
消息到达MQ后,处理消息的进程发生异常
-
-
MQ导致消息丢失:
-
消息到达MQ,保存到队列后,尚未消费就突然宕机
-
-
消费者处理消息时:
-
消息接收后尚未处理突然宕机
-
消息接收后处理过程中抛出异常
-
一、发送者的可靠性
确保生产者一定能把消息发送到MQ
1.1 生产者重试机制
生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate
与MQ连接超时后,多次重试。
修改publisher
模块的application.yaml
文件
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
1.2 生产者确认机制
在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
-
MQ内部处理消息的进程发生了异常
-
生产者发送消息到达MQ后未找到
Exchange
-
生产者发送消息到达MQ的
Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
-
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
-
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
-
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
-
其它情况都会返回NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
1.3 实现生产者确认
(1)开启生产者确认
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
-
none
:关闭confirm机制 -
simple
:同步阻塞等待MQ的回执 -
correlated
:MQ异步回调返回回执
(2)定义ReturnCallback
每个RabbitTemplate
只能配置一个ReturnCallback,
因此我们可以在配置类中统一设置
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
(3)定义ConfirmCallback
作用:用于消息发布的确认。
触发时机:当生产者发送消息到 RabbitMQ 时,RabbitMQ 会告诉生产者消息是否成功被接收。这个回调接口用于通知生产者消息是否成功被 RabbitMQ 接收并存储。如果消息被正确处理,它会调用 ConfirmCallback
,表示消息已成功送达队列。
常见应用场景:用来确保消息已经被 RabbitMQ 接收,防止消息丢失。
简单来说,ConfirmCallback
就是用来告诉生产者:“你的消息已被成功接收并存储。”
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
-
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆 -
SettableListenableFuture
:回执结果的Future对象
将来MQ的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
-
路由失败:一般是因为RoutingKey错误导致,往往是编程导致
-
交换机名称错误:同样是编程错误导致
-
MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
二、MQ的持久化
2.1 数据持久化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
-
交换机持久化
-
队列持久化
-
消息持久化
都可以用控制台实现
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
2.2 LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
-
消费者宕机或出现网络故障
-
消息发送量激增,超过了消费者处理速度
-
消费者处理业务发生阻塞
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
-
接收到消息后直接存入磁盘而非内存
-
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
-
支持数百万条的消息存储
2.2.1 控制台配置Lazy模式
在添加队列的时候,添加x-queue-mod=lazy
参数即可设置队列为Lazy模式
2.2.2 代码配置Lazy模式
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
2.2.3 更新已有队列为lazy模式
(1)命令行设置policy
(2)控制台配置policy
三、消费者的可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。
3.1 消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
-
ack:成功处理消息,RabbitMQ从队列中删除该消息
-
nack:消息处理失败,RabbitMQ需要再次投递消息
-
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
-
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用 -
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活 -
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:-
如果是业务异常,会自动返回
nack
; -
如果是消息处理或校验异常,自动返回
reject
;
-
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
3.2 失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力
应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
结论:
-
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring会返回reject,消息会被丢弃
3.3 失败处理策略
本地测试达到最大重试次数后,消息会被丢弃。
Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
-
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式 -
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队 -
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
3.4 业务幂等性
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
-
页面卡顿时频繁刷新导致表单重复提交
-
服务间调用的重试
-
MQ消息的重复投递
举例:
-
假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
-
由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
-
但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
-
退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
-
唯一消息ID
-
业务状态判断
3.4.1 唯一消息ID
这个思路非常简单:
-
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
-
消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
-
如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
3.4.2 业务判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
四、延迟消息
在电商支付流程中,对于库存有限的商品,如电影票或高铁票,通常在用户下单时就会锁定库存资源,以避免其他用户购买。然而,如果用户下单后一直不付款,这将导致库存占用,阻止其他用户购买。因此,电商平台通常会设置订单超时机制,如果订单超过一定时间未支付,系统会自动取消订单并释放库存。
在这种场景下,我们需要定时检查未支付订单的状态。理想的做法是,在用户下单后30分钟检查订单状态,如果未支付,则取消订单并释放库存。为了解决这个问题,我们需要使用延迟任务,即让系统在一段时间后执行特定操作。
延迟任务的实现通常借助消息队列(MQ)来完成,RabbitMQ提供了两种方式来实现延迟消息:
- 死信交换机 + TTL(过期时间)
- 延迟消息插件
4.1 死信交换机
死信(Dead Letter)是指当消息在队列中无法正常消费时,进入的特殊状态。以下是常见的死信情况:
- 消费者拒绝消息并且不希望消息重新入队。
- 消息超时未被消费。
- 队列满时无法投递的消息。
当消息成为死信时,如果队列配置了死信交换机(DLX),消息将被发送到该交换机,再根据路由规则投递到对应的队列中。
死信交换机有什么作用呢?
-
收集那些因处理失败而被拒绝的消息
-
收集那些因队列满了而被拒绝的消息
-
收集因TTL(有效期)到期的消息
通过死信交换机实现延迟消息的原理如下:
- 发送消息到一个TTL(时间到期)队列,设置消息的有效期(TTL)。
- 当消息过期后,它会成为死信,并且被转发到死信交换机。
- 在死信交换机中,消息被投递到另一个队列(通常是目标处理队列),并开始被消费。
注意:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
4.2 DelayExchange(延迟消息插件)
基于docker安装
先查看RabbitMQ的插件目录对应的数据
docker volume inspect mq-plugins
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4.2.1 声明延迟交换机
(1)注解
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
(2)@Bean
package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
4.2.2 发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。