RabbitMQ如何防止消息丢失及重复消费
一、消息丢失
如图所示,RabbitMQ丢失消息的情况可以发送在任何一个节点。
1.1、生产者没有成功把消息发送到MQ
丢失的原因 :因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。
解决办法 : 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制(发布确认机制)。
注意:RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。
两个机制说明如下。
1.1.1、confirm(发布确认)机制
解释:RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以重新发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。
RabbitMQ自带的两个回调:
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/**
* 消息确认成功的回调函数,方法参数:1.消息序列号、2.批量标识,true为批量,false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
//2:删除到已经确认的消息 剩下的就是未确认的消息
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
//删除
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:"+ deliveryTag);
};
//消息确认失败的回调,方法参数:1.消息的标记 2.是否为批量确认
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
//3:打印一下未确认的消息都有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("发布的消息:"+message+"未被确认,序列号:"+deliveryTag);
};
//添加一个异步确认的监听器 ,方法参数:1.确认收到消息的回调、2、未收到消息的回调
//异步通知
channel.addConfirmListener(ackCallback, nackCallback);
详细代码可看:RabbitMQ之发布确认模式 中的异步发布确认 部分。
RabbitMQ与SpringBoot整合之后 RabbitTemplate 中的两个回调:
ConfirmCallback.confirm() :确认回调方法,成功和失败都要回调,这个也只是起通知作用。
ReturnCallback.returnedMessage():回退方法,在当消息传递过程中 不可达目的地 时将消息返回给生产者,这种情况下,消息就会被直接丢弃,因为它只是把消息的相关信息返回给生产者,具体要去怎么处理,需要我们自己再去处理,建议使用 “备份交换机”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
具体代码可看:发布确认- 整合springboot (回退消息、备份交换机) 要全部看完
1.1.2、事务机制
解释:RabbitMQ 提供了事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。伪代码如下:
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
1.2、RabbitMQ接收到消息之后丢失了消息
丢失的原因 :RabbitMQ接收到生产者发送过来的消息,是存在内存中的,如果没有被消费完,此时RabbitMQ宕机了,那么再次启动的时候,原来内存中的那些消息都丢失了。
解决办法 :开启RabbitMQ的持久化。当生产者把消息成功写入RabbitMQ之后,RabbitMQ就把消息持久化到磁盘。结合上面的说到的confirm机制,如果开启了消息的持久化,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败 。存入磁盘的消息不会丢失,就算RabbitMQ挂掉了,重启之后,他会读取磁盘中的消息,不会导致消息的丢失。
持久化的配置:
第一点是队列持久化,创建 queue 的时候将其设置为持久化,这个时候即使重启 rabbitmq 队列,它也依然存在,但是它是不会持久化 queue 里的数据的。
第二个是消息持久化,发送消息的时候将消息的 deliveryMode 设置为 2,也就是参数:MessageProperties.PERSISTENT_TEXT_PLAIN,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
注意:持久化要起作用必须同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
要保证消息不丢失,我们开启队列、消息持久化的同时,也会开启 confirm(发布确认) 模式。因为即使我们在生产者中设置了队列持久化、消息持久化,但依然存在消息被传送到队列上,还没来得及存储在磁盘上,队列就宕机了,这种情况下消息也是会丢失的。所以在之前两步的基础上还是进行第三步:发布确认。三步操作加一起才能保证消息 从生产者到RabbitMQ服务器这个过程 是不丢失的。
可以参考:RabbitMQ之工作队列模式 以及(多个消费者时的消费策略、消费者消息应答机制、RabbitMQ 持久化操作) 中最后一部分
1.3、消费者弄丢了消息
丢失的原因 :如果RabbitMQ成功的把消息发送给了消费者,那么RabbitMQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息,然后宕机了,那么这个消息就丢失了。
解决的办法 :简单来说,就是必须关闭 RabbitMQ 的自动 ack,采用手动ack (消息应答机制)。这样的话,如果你还没处理完,不就没有 ack了! 那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。RabbitMQ消息丢失之后的处理:消息自动重新入队。
消息应答机制 : 为了保证消息在消费过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
消息应答机制,分为两种:自动应答、手动应答。
自动应答:
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答。 true代表自动应答 ,false手动应答
* 3.消费者,成功消费的回调
* 4.消费者 取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
手动应答:
//接收消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery)-> {
//休眠1s
SleepUtils.sleep(1);
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Worker03-C1-接收到的消息:"+ message);
//ACK肯定确认
//1.消息标记tag
//2.是否批量应答:false为不批量,true为批量
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//取消消费的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag+ ":Worker03-C1-消息者 取消消费了-回调接口");
};
//接收消息,采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
消息丢失之后的处理:消息自动重新入队 。如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将被重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
可以参考:RabbitMQ之工作队列模式 以及(多个消费者时的消费策略、消费者消息应答机制、RabbitMQ 持久化操作) 中的:消息应答机制—保证消息在消费过程中不丢失,里面有具体的演示。
二、如何防止重复消费
先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;(消费者的 消息应答机制)
但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息的 幂等性;
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
这个问题针对业务场景来答分以下几点(还是要在消费方做数据存储时 手动去重):
如果消息是做数据库的insert操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
如果消息是做redis的set的操作,不用解决,因为无论set几次结果都是一样的,set操作本来就是幂等操作。
如果以上两种情况还不行,可以准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。