RabbitMQ高级特性
1. 消息确认(消费者)
1.1 消息确认机制
⽣产者发送消息, 到达消费端之后, 可能会有以下情况:
a.消息处理成功
b.消息处理异常
RabbitMQ向消费者发送消息之后, 就会把这条消息删掉,那么如何确保消费端已经成功接收了, 并正确处理了呢?
为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制
消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:
- ⾃动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, ⽽不管消费者是否真正地消费到了这些消息. ⾃动确认模式适合对于消息可靠性要求不⾼的场景.
- ⼿动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令, 回复确认信号后才从内存(或者磁盘)中移出消息,这种模式适合对消息可靠性要求⽐较⾼的场景.
当autoAck参数置为false, 对于RabbitMQ服务端⽽⾔, 队列中的消息分成了两个部分:
⼀是等待投递给消费者的消息.
⼆是已经投递给消费者, 但是还没有收到消费者确认信号的消息.
当然,如果RabbitMQ⼀直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ 会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者.
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数:
1.2 手动确认方法
肯定确认
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功的处理消息. 可以将其丢弃了
参数说明:
1) deliveryTag:
消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值. deliveryTag 是每个通道 (Channel)独⽴维护的, 所以在每个通道上都是唯⼀的. 当消费者确认(ack)⼀条消息时, 必须使⽤对应的通道上进⾏确认.
它确保了消息传递的可靠性和顺序性
2) multiple:
是否批量确认. 在某些情况下, 为了减少⽹络流量, 可以对⼀系列连续的 deliveryTag 进⾏批量确认.
值为 true 则会⼀次性 ack所有⼩于或等于指定 deliveryTag 的消息.
值为false, 则只确认当前指定deliveryTag的消息
举个例子:
否定确认
Channel.basicReject(long deliveryTag, boolean requeue)
channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息.
参数说明:
1)requeue:
表⽰拒绝后, 这条消息如何处理.⼀次只能拒绝⼀条消息
如果requeue 参数设置为true, 则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者.
如果requeue参数设置为false, 则RabbitMQ会把 消息从队列中移除, ⽽不会把它发送给新的消费者
否定确认(批量)
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
想要批量拒绝消息,则可以使⽤Basic.Nack这个命令
1.3 代码示例
Spring-AMQP 对消息确认机制提供了三种策略:
public enum AcknowledgeMode{
NONE,
MANUAL,
AUTO;
}
1.AcknowledgeMode.NONE
这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.(相当于自动确认)
2.AcknowledgeMode.AUTO(默认)
这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息.
3.AcknowledgeMode.MANUAL
⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被重新处理
1.3.1 AcknowledgeMode.NONE
①配置
②发送消息
队列,交换机配置:
声明:
@Configuration
public class RabbitMQConfig {
//消息确认
@Bean("ackQueue")
public Queue ackQueue(){
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
//return QueueBuilder.nonDurable().build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).build();
}
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("ack");
}
}
通过接口发送消息:
@RequestMapping("/producer")
@RestController
public class ProducerController {
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE, "ack", "consumer ack mode test...");
return "消息发送成功";
}
}
③消费者
@Component
public class ACKListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//消费者逻辑
System.out.printf("接收到消息: %s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
//业务逻辑处理
System.out.println("业务逻辑处理");
//消费者处理异常,MQ删除相应信息
int num = 3/0;
System.out.println("业务处理完成");
}
}
先把消费者注掉:
开启后,控制台输出:
管理界面:
可以看到, 消费者处理失败, 但是消息已经从RabbitMQ中移除
1.3.2 AcknowledgeMode.AUTO(默认)
①配置
② 重新运行
一直在重新入队:
管理界面:
从⽇志上可以看出, 当消费者出现异常时, RabbitMQ会不断的重发.
由于异常,多次重试还是失败,消息没被确认,也⽆法nack,就⼀直是unacked状态,导致消息积压.
1.3.3 AcknowledgeMode.MANUAL
①配置
②消费端
如果不确认会怎么样?
即使消费者处理成功,也不会确认
修改消费端代码:
@Component
public class ACKListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("接收到消息: %s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
//业务逻辑处理
System.out.println("业务逻辑处理");
//消费者处理异常,MQ删除相应信息
//int num = 3/0;
System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag,false,true);
}
}
}
如果正常处理:
消费端启动:
异常处理:
消费异常时不断重试, deliveryTag 从1递增
管理界⾯上unacked也变成了1
2. 持久性
2.1 交换机持久化
如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失
true-持久化
fasle-非持久化
2.2 队列持久化
如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没有了, 消息也⽆处可存了)
队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化.
点进去看源码会发现,该⽅法默认durable 是true
通过nonDurable创建非持久化队列:
2.3 消息持久化
消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2, 也就是 MessageDeliveryMode.PERSISTENT
使⽤RabbitTemplate 发送持久化消息, 代码如下:
RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为⾮持久化,或者消息在发送时被标记为⾮持久化
综上:
可以通过重启rabbitmq进行测试
非持久化的D标识会消失
测试几组:
注意:
将所有的消息都设置为持久化, 会严重影响RabbitMQ的性能(随机).
写⼊磁盘的速度⽐写⼊内存的速度慢得不只⼀点点.
对于可靠性不是那么⾼的消息可以不采⽤持久化处理以提⾼整体的吞吐量.
在选择是否要将消息持久化时, 需要在可靠性和吐吞量之间做⼀个权衡
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
答案是否定的
- 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之 后, 还没来得及处理就宕机了, 这样也算数据丢失. 这种情况很好解决, 将autoAck参数设置为 false, 并进⾏⼿动确认.
- 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理, 可能仅仅保存到操 作系统缓存之中⽽不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失.
那么怎么解决呢?
- 引⼊RabbitMQ的仲裁队列, 如果主节点(master)在此特殊时间内挂掉, 可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性, 除⾮整个集群都挂掉(此⽅法也不能保证100%可靠, 但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多, 实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
- 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ 中(发送方确认)
3. 发送方确认
当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢?
RabbitMQ提供了两种解决⽅案:
a. 通过事务机制实现(事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多)
b. 通过发送⽅确认(publisher confirm) 机制实现
RabbitMQ提供了两个⽅式来控制消息的可靠性投递:
1. confirm确认模式
2. return退回模式
两种方式并不互斥,可以同时使用
3.1 confirm模式
Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达 Exchange, 这个监听都会被执⾏
如果Exchange成功收到, ACK为true, 如果没收到消息, ACK就为false.
①配置
②发送消息
交换机,队列:
//发送方确认
@Bean("confirmQueue")
public Queue firmQueue(){
return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();
//return QueueBuilder.nonDurable().build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(Constant.CONFIRM_EXCHANGE).build();
}
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("confirm");
}
回调逻辑:
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack){
System.out.printf("接收到消息,ID: %s",correlationData == null?null:correlationData.getId());
}else {
System.out.printf("未接收到消息,ID: %s,cause: %s\n",correlationData==null?null:correlationData.getId(),cause);
//相应的业务逻辑处理
}
}
});
return rabbitTemplate;
}
}
生产者:
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm(){
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);
return "发送成功";
}
}
RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别
在RabbitMQ中, ConfirmListener和ConfirmCallback都是⽤来处理消息确认的机制, 但它们属于不同 的客⼾端库, 并且使⽤的场景和⽅式有所不同.
1. ConfirmListener 是 RabbitMQ Java Client 库中的接⼝. 这个库是 RabbitMQ 官⽅提供的⼀个直接与RabbitMQ服务器交互的客⼾端库. ConfirmListener 接⼝提供了两个⽅法: handleAck 和 handleNack, ⽤于处理消息确认和否定确认的事件.
2. ConfirmCallback 是 Spring AMQP 框架中的⼀个接⼝. 专⻔为Spring环境设计. ⽤于简化与 RabbitMQ交互的过程. 它只包含⼀个 confirm ⽅法,⽤于处理消息确认的回调.
在 Spring Boot 应⽤中, 通常会使⽤ ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可 以利⽤ Spring 的配置和依赖注⼊功能. ⽽在使⽤ RabbitMQ Java Client 库时, 则可能会直接实现 ConfirmListener 接⼝, 更直接的与RabbitMQ的Channel交互
③测试
接下来把交换机名称改下, 重新运⾏, 会触发另⼀个结果:
这里刚开始,我是这样写的:
@RequestMapping("/confirm") public String confirm(){ //设置回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("执行了confirm方法"); if (ack){ System.out.printf("接收到消息,ID: %s",correlationData == null?null:correlationData.getId()); }else { System.out.printf("未接收到消息,ID: %s,cause: %s\n",correlationData==null?null:correlationData.getId(),cause); //相应的业务逻辑处理 } } }); CorrelationData correlationData = new CorrelationData("1"); confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData); return "发送成功"; }
但是后面发现:
再次调用接口
日志台:
因为每个RabbitTemplate只能设置一次回调函数
并且此时运行其它接口:
说明了会影响所有使用RabbitTemplate的方法
RabbitTemplate是单例的,在这里修改了,则其它的地方也被修改
所以要单独设置
并且如果没有注明使用哪个,默认使用我们自己设定的
3.2 return模式
消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者.
消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理.
①配置
②发送消息
@RequestMapping("/returns")
public String returns(){
//confirmRabbitTemplate.setReturnsCallback();
CorrelationData correlationData = new CorrelationData("5");
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm11","return test...",correlationData);
return "发送成功";
}
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack){
System.out.printf("接收到消息,ID: %s",correlationData == null?null:correlationData.getId());
}else {
System.out.printf("未接收到消息,ID: %s,cause: %s\n",correlationData==null?null:correlationData.getId(),cause);
//相应的业务逻辑处理
}
}
});
//消息退回时,回调方法
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息退回:"+returnedMessage);
}
});
return rabbitTemplate;
}
}
使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false). 这个属性 的作⽤是告诉RabbitMQ, 如果⼀条消息⽆法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此时 ReturnCallback 就会被触发.
回调函数中有⼀个参数,ReturnedMessage, 包含以下属性:
③测试
3.3 如何保证RabbitMQ消息的可靠传输?
从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
1. ⽣产者将消息发送到 RabbitMQ失败
- a. 可能原因: ⽹络问题等
- b. 解决办法: 发送⽅确认-confirm确认模式
2. 消息在交换机中⽆法路由到指定队列:
- a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
- b. 解决办法: 发送⽅确认-return模式
3. 消息队列⾃⾝数据丢失:
- a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
- b. 解决办法: 开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果 RabbitMQ 挂了, 恢复之后会⾃动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的⽅式提⾼可靠性)
4. 消费者异常, 导致消息丢失:
- a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
- b. 解决办法: RabbitMQ 提供了 消费者应答机制来使 RabbitMQ 能够感知 到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认, 当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制 当消息消费异常时, 通过消息重试确保消息的可靠性
4. 重试机制
在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数
4.1 配置
4.2 交换机,队列
//重试机制
@Bean("retryQueue")
public Queue retryQueue(){
return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
//return QueueBuilder.nonDurable().build();
}
@Bean("retryExchange")
public DirectExchange retryExchange(){
return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryExchange") DirectExchange directExchange, @Qualifier("retryQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("retry");
}
4.3 发送消息
@RequestMapping("/retry")
public String retry(){
rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","retry test...");
return "发送成功";
}
4.4 消费消息
@Component
public class RetryListener {
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constant.RETRY_QUEUE+"]接收到消息: %s,deliveryTag: %s\n",new String(message.getBody(),"UTF-8"),
deliveryTag);
int sum=3/0;
System.out.println("处理成功");
}
}
4.5 运行程序,观察结果
消息一直在重试,知道最大限制
达到次数之后,抛出异常
4.6 手动确认
@Component
public class RetryListener {
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void handlerMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constant.RETRY_QUEUE+"]接收到消息: %s,deliveryTag: %s\n",new String(message.getBody(),"UTF-8"),
deliveryTag);
try {
int sum=3/0;
System.out.println("处理成功");
channel.basicAck(deliveryTag,false);
}catch (Exception e){
channel.basicNack(deliveryTag,false,true);
}
}
}
在auto下:
manual:
此时tag会自动增加,而auto不会
可以看到, ⼿动确认模式时, 重试次数的限制不会像在⾃动确认模式下那样直接⽣效, 因为是否重试以及何时重试更多地取决于应⽤程序的逻辑和消费者的实现
⾃动确认模式下, RabbitMQ 会在消息被投递给消费者后⾃动确认消息. 如果消费者处理消息时抛出异常, RabbitMQ 根据配置的重试参数⾃动将消息重新⼊队
注意:
- ⾃动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被⾃动确认, 那么消息就丢失了
- ⼿动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, ⽆法被确认, 就⼀直是 unacked的状态, 导致消息积压
5.TTL
TTL, 即过期时间. RabbitMQ可以对消息和队列设置TTL
当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除
5.1 设置消息的TTL
有两种⽅法可以设置消息的TTL:
a. 设置队列的TTL, 队列中所有消息都有相同的过期时间
b. 对消息本⾝进⾏单独设置
如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.
①对每条消息设置TTL
交换机,队列:
//ttl
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange directExchange, @Qualifier("ttlQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("ttl");
}
发送消息:
@RequestMapping("/ttl")
public String ttl(){
// MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
// @Override
// public Message postProcessMessage(Message message) throws AmqpException {
// message.getMessageProperties().setExpiration("10000");//单位毫秒
// return message;
// }
// };
// rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","ttl test...",messagePostProcessor);
//简写
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","ttl test 30s...",message -> {
message.getMessageProperties().setExpiration("30000");//单位毫秒
return message;
});
}
运行程序,看结果:
3s后消失:
5.2 设置队列的TTL
设置队列TTL的⽅法是在创建队列时, 加⼊ x-message-ttl 参数实现的, 单位是毫秒
也可以通过下面代码设置:
下面通过接口,查看:
队列Features有⼀个TTL标识
20s后,消息已被删除
5.3 区别
设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的.
为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.
⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可
举个例子::
如果消息ttl 10s,队列20s(同时设置)
ttl队列未设置过期时间
ttl2设置过期时间为20s
发送一条ttl为10s消息,取其短
可以发现,当10s后第一条消息没有消失,而是两条一起消失
6. 死信队列
消息变成死信⼀般是由于以下⼏种情况:
1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false
2. 消息过期
3. 队列达到最⼤⻓度
6.1 声明队列和交换机
- 声明正常的队列和正常的交换机
- 声明死信队列和死信交换机
//死信
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange directExchange, @Qualifier("normalQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("normal");
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(Constant.DL_QUEUE).build();
//return QueueBuilder.nonDurable().build();
}
@Bean("dlExchange")
public DirectExchange dlExchange(){
return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlExchange") DirectExchange directExchange, @Qualifier("dlQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("dlx");
}
6.2 正常队列绑定死信交换机
当这个队列中存在死信时, RabbitMQ会⾃动的把这个消息重新发布到设置的DLX上, 进⽽被路由到另⼀ 个队列, 即死信队列.可以监听这个死信队列中的消息以进⾏相应的处理
6.3 死信条件
6.4 发送消息
@RequestMapping("/dl")
public String dl(){
//发送普通信息
//rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","dl test...");
//测试队列长度
for (int i = 0; i<20;i ++){
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","dl test..."+i);
}
return "发送成功";
}
6.5 测试
程序启动之后, 观察队列
- D: durable的缩写, 设置持久化
- TTL: Time to Live, 队列设置了TTL
- Lim: 队列设置了⻓度
- DLX: 队列设置了死信交换机
- DLK: 队列设置了死信
6.5.1 过期时间
10秒后, 消息进⼊到死信队列:
⽣产者⾸先发送⼀条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中.
由于队列normal_queue设置了过期时间为10s, 在这10s内没有消费者消费这条消息, 那么判定这条消息过期.
由于设置了DLX, 过期之时, 消息会被丢给交换器(dlx_exchange)中, 这时根据RoutingKey匹配, 找到匹配的队列(dlx_queue), 最后消息被存储在queue.dlx这个死信队列中.
6.5.2 队列长度
发送前, 死信队列没有数据
发送20条消息
运⾏后, 可以看到死信队列变成了10条:
过期之后, 正常队列的10条也进入死信队列:
6.5.3 消息拒收
消费者:
@Component
public class DLXListener {
@RabbitListener(queues = Constant.NORMAL_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("[normal.queue]接收到消息: %s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
//业务逻辑处理
System.out.println("业务逻辑处理");
//消费者处理异常,MQ删除相应信息
int num = 3/0;
System.out.println("业务处理完成");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag,false,false);//变成死信
}
}
@RabbitListener(queues = Constant.DL_QUEUE)
public void dlHandMessage(Message message, Channel channel) throws Exception {
System.out.printf("[dl.queue]接收到消息: %s,deliveryTag: %d \n",
new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
}
}
发送消息,运行:
6.6 常⻅⾯试题
6.6.1 死信队列的概念
死信是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息
6.6.2 死信的来源
1) 消息过期:
消息在队列中存活的时间超过了设定的TTL
2) 消息被拒绝:
消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如 果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
3) 队列满了:
当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信
6.6.3 应⽤场景
死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正 确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统
⽐如:
⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态
为了保证⽀付信息不丢失, 需要使⽤到死信队列机制.
消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).
场景的应⽤场景还有:
消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理
消息丢弃:直接丢弃这些无法处理的消息,以避免它们占⽤系统资源
7. 延迟队列
7.1 概念
延迟队列,即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后, 消费者才能拿到这个消息进⾏消费
7.2 应用场景
- 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
- ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
- ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等
- ...
但是,RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 可以通过的TTL+死信队列的⽅式组合模拟出延迟队列的功能
举个例子:
假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中.
消费者订阅的并⾮是 normal_queue 这个队列, ⽽是 dlx_queue 这个队列.
当消息过期之后,被存⼊ dlx_queue 这个 队列中,消费者就恰巧消费到了延迟10秒的这条消息.
7.3 TTL+死信队列实现
继续沿⽤死信队列的代码即可
发送消息:
@RequestMapping("/dl")
public String dl(){
//发送普通信息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","dl test...");
System.out.printf("%tc 消息发送成功\n",new Date());
return "发送成功";
}
消费端:
@Component
public class DLXListener {
@RabbitListener(queues = Constant.DL_QUEUE)
public void dlHandMessage(Message message, Channel channel) throws Exception {
System.out.printf("[dl.queue] %tc 接收到消息: %s,deliveryTag: %d \n",new Date(),
new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
}
}
可以看出,在10s后接收到消息:
存在问题:
先发送30s过期数据, 再发送10s过期数据
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.build();
//return QueueBuilder.nonDurable().build();
}
@RequestMapping("delay")
public String delay(){
//简写
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","ttl test 10s...",message -> {
message.getMessageProperties().setExpiration("10000");//单位毫秒
return message;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","ttl test 30s...",message -> {
message.getMessageProperties().setExpiration("30000");//单位毫秒
return message;
});
System.out.printf("%tc 消息发送成功 \n",new Date());
return "发送成功..";
}
发现,10s过期的消息, 是在30s后才进⼊到死信队列
先发送10s,后30s:
通过TTL+死信方式实现,当后来的消息过期时间早于先到消息时,延迟功能出现问题
7.4 延迟队列插件
安装插件
地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
然后将上面下载的内容放到rabbitmq的安装地址
插件上传⽬录参考:installing Additional Plugins | RabbitMQ
/usr/lib/rabbitmq/plugins 是⼀个附加⽬录, RabbitMQ包本⾝不会在此安装任何内容, 如果没有这个路径, 可以⾃⼰进⾏创建
当前目录下没有任何文件,将下载的内容复制进去
删除子文件:
启动插件
#查看插件列表
rabbitmq-plugins list
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启服务
service rabbitmq-server restart
验证插件:
使用插件
声明交换机,队列:
@Configuration
public class DelayConfig {
@Bean("delayQueue")
public Queue delayQueue(){
return QueueBuilder.durable(Constant.DELAY_QUEUE).build();
}
@Bean("delayExchange")
public Exchange delayExchange(){
return ExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().build();//延迟类型
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(delayExchange()).with("delay").noargs();
}
}
发送消息:
@RequestMapping("delay2")
public String delay2(){
//简写
rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay","delay test 10s...",message -> {
message.getMessageProperties().setDelay(20000);//单位毫秒
return message;
});
rabbitTemplate.convertAndSend(Constant.DELAY_EXCHANGE,"delay","delay test 20s...",message -> {
message.getMessageProperties().setDelay(10000);//单位毫秒
return message;
});
System.out.printf("%tc 消息发送成功 \n",new Date());
return "发送成功..";
}
消费端:
//@Component
public class DelayListener {
@RabbitListener(queues = Constant.DELAY_QUEUE)
public void delayHandMessage(Message message, Channel channel) throws Exception {
//消费者逻辑
System.out.printf("[delay queue] %tc 接收到消息: %s \n",new Date(),
new String(message.getBody(),"UTF-8"));
}
}
运行,看结果:
我们知道ttl+死信,会出现消息乱序,那插件会吗?
下面先发30s,再发10s:
把消费端注掉:
10s后:
20s后:
常见面试题
介绍RabbitMQ的延迟队列
延迟队列是⼀个特殊的队列, 消息发送之后, 并不⽴即给消费者, ⽽是等待特定的时间, 才发送给消费者. 延迟队列的应⽤场景有很多
应用场景:
- 订单在⼗分钟内未⽀付⾃动取消
- ⽤⼾注册成功后, 3天后发调查问卷
- ⽤⼾发起退款, 24⼩时后商家未处理, 则默认同意, ⾃动退款
- ...
RabbitMQ本⾝并没直接实现延迟队列, 通常有两种⽅法:
- TTL+死信队列组合的⽅式
- 使⽤官⽅提供的延迟插件实现延迟功能
两种实现方式对比
1.基于死信实现的延迟队列
a. 优点:
- 灵活不需要额外的插件⽀持
b. 缺点:
- 存在消息顺序问题
- 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
2. 基于插件实现的延迟队列
a. 优点:
- 通过插件可以直接创建延迟队列, 简化延迟消息的实现.
- 避免了DLX的时序问题
b. 缺点:
- 需要依赖特定的插件, 有运维⼯作
- 只适⽤特定版本
8. 事务
RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败
8.1 不采用事务发送
代码:
@RequestMapping("/trans")
public String trans(){
rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"trans test1..");
int num = 5/0;
rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"trans test2..");
return "发送成功";
}
发送消息:
发现:
第一条消息成功,第二条失败
8.2 配置事务管理器
@Bean
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);//开启事务
return rabbitTemplate;
}
//事务管理器
//不需要注入
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
发送消息:
@Transactional
@RequestMapping("/trans2")
public String trans2(){
transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"trans test1..");
int num = 5/0;
transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"trans test2..");
return "发送成功";
}
此时无法发送:
注意:
不加 @Transactional , 会发现消息1发送成功
9. 消息分发
9.1 概念
RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者.
如果现在负载加重,那么只需要创建更多的消费者来消费 处理消息即可.
默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降.
如何处理呢? 我们可以channel.basicQos⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量.
⽐如:
消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀ 条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息. 类似TCP/IP中的"滑动窗⼝".
prefetchCount 设置为0时表⽰没有上限.
basicQos 对拉模式的消费⽆效(后⾯再讲)
9.2 应用场景
1. 限流
2. ⾮公平分发
9.2.1 限流
如下使⽤场景:
订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求
但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮.
代码:
①配置prefetch参数, 设置应答⽅式为⼿动应答
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://xuexue:xuexue@119.91.154.99:5672/extension
listener:
simple:
acknowledge-mode: manual #消息接收确认
prefetch: 5
②配置交换机, 队列
@Configuration
public class QosConfig {
//消息确认
@Bean("qosQueue")
public Queue QosQueue(){
return QueueBuilder.durable(Constant.QOS_QUEUE).build();
}
@Bean("qosExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).build();
}
@Bean("qosBinding")
public Binding ackBinding(@Qualifier("QosExchange") DirectExchange directExchange, @Qualifier("QosQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("qos");
}
}
③发送消息
@RequestMapping("/qos")
public String qos(){
//发送普通信息
for (int i=0;i<20;i++){
rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","qos test...");
}
return "发送成功";
}
④消费端
//@Component
public class QosListener {
@RabbitListener(queues = Constant.QOS_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("接收到消息: %s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
System.out.println("业务逻辑处理");
//肯定确认
//channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag,false,true);
}
}
}
⑤观察结果
先不开启肯定确认:
只发送了五条
如果不设置prefetch参数呢?
20条全发送过去了
9.2.2 ⾮公平分发
⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲.
这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量
代码:
设置prefetch=1 的⽅式, 告诉 RabbitMQ ⼀次只给⼀个消费者⼀条消息, 也就是说, 在处理并确认前⼀条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下⼀个不忙的消费者.
①配置配置prefetch参数为1, 设置应答⽅式为⼿动应答
②启动两个消费者
第一个消费者慢,没有ack
//@Component
public class QosListener {
@RabbitListener(queues = Constant.QOS_QUEUE)
public void handMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("111接收到消息: %s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
//处理一次两秒钟
Thread.sleep(2000);
System.out.println("业务逻辑处理");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag,false,true);
}
}
//干的比较快,会ack
@RabbitListener(queues = Constant.QOS_QUEUE)
public void handMessage2(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消费者逻辑
System.out.printf("222接收到消息: %s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8")
,message.getMessageProperties().getDeliveryTag());
//处理一次1秒钟
Thread.sleep(1000);
System.out.println("业务逻辑处理");
//肯定确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliveryTag,false,true);
}
}
}
deliveryTag 有重复是因为两个消费者使⽤的是不同的Channel, 每个 Channel 上的 deliveryTag是独⽴计数的