当前位置: 首页 > article >正文

RabbitMQ高级特性--消息确认机制

目录

一、消息确认

1.消息确认机制

2.手动确认方法

二、代码示例

1. AcknowledgeMode.NONE

1.1 配置文件

1.2 生产者

 1.3 消费者

1.4 运行程序 

 2.AcknowledgeMode.AUTO

3.AcknowledgeMode.MANUAL


一、消息确认

1.消息确认机制

生产者发送消息之后,到达消费端之后,可能会有以下情况:

1. 消息处理成功;

2. 消息处理异常。

RabbitMQ向消费者发送消息后,就会把这条消息删除掉,那么第二种情况就会造成消息丢失。

那么如何确保消息端已经被成功接收了并且被正确处理了呢?

为了确保消息从队列可靠的到达消费者,RabbitMQ提供了消息确认机制(Messageacknowledment)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数,消息确认机制分为以下两种:

自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的接收到消息,自动确认模式适用于对于消息可靠性要求不高的场景。

手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式的调用BasicAck命令,回复确认信号后才从内存(或者磁盘)中删除,这种方式适用于对消息可靠性要求较高的场景。

自动确认代码示例:

DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

当autoAck参数置为false,对于RabbitMQ服务器来说,队列中的消息分为了两个部分:

一是等待发送给消费者的消息;二是已经发送给消费者,但是还没收到消费者确认信号的消息。

如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会重新安排这条消息进入队列,等待投递给下一个消费者,当然也有可能是原来的那个消费者。

从RabbitMQ的Web管理平台上也可以看到当前队列中Ready状态和Unacked状态的消息数。 

Ready:等待投递给消费者的消息数。

Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数。

2.手动确认方法

消费者在收到消息后,可以选择确认,也可以选择跳过或者直接拒绝确认,RabbitMQ也提供了不同的确认方法,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种:

肯定确认: Channel.basicAck(long deliveryTag, boolean multiple);

RabbitMQ 已经知道该消息并且成功的处理消息,可以将其丢弃。

参数说明:

deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值,deliveryTag是每个信道(Channel)独立维护的,所以在每个信道上都是唯一的,当消费者确认(ack)一条消息时,必须使用对应的信道进行确认。

multiple:是否批量确认,在某些情况下,为了减少网络流量,可以对一系列连续的deliveryTag进行批量确认,值为true则会一次性ack所以小于等于指定deliveryTag的消息,值为false,则只确认当前deliveryTag的消息。

deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分, 它确保了消息传递的可靠性和顺
序性。
否定确认: Channel.basicReject(long deliveryTag, boolean requeue);

参数说明:

deliveryTag:参考上文。

requeue:表示拒绝后,这条消息该如何处理,如果值为true那么,则RabbitMQ会将这条消息重新入队,重新发送给下一个订阅的消费者,值为false,则RabbitMQ会把这条消息从队列中移除,不会再发送给消费者。

否定确认: Channel.basicNack(long deliveryTag, boolean multiple,
boolean requeue);

参数说明:

参考上文 

multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

二、代码示例

我们基于SpringBoot来演示消息的确认机制,使用方式和方法与RabbitMQ Java Client有一定差异,

Spring AMQP对消息确认提供了三种策略:

public enum AcknowledgeMode {
 NONE,
 MANUAL,
 AUTO;
}
. AcknowledgeMode.NONE:
这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认
消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
  AcknowledgeMode.AUTO(默认):
这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确
认消息.
  AcknowledgeMode.MANUAL:
⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消
息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这
种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被
重新处理.

1. AcknowledgeMode.NONE

1.1 配置文件

spring:
 rabbitmq:
 addresses: amqp:
 listener:
 simple:
 acknowledge-mode: none

1.2 生产者

public class Constant {
 public static final String ACK_EXCHANGE_NAME = "ack_exchange";
 public static final String ACK_QUEUE = "ack_queue";
}
/*
以下为消费端⼿动应答代码⽰例配置
*/
@Bean("ackExchange")
public Exchange ackExchange(){
 return
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
@Bean("ackQueue")
public Queue ackQueue() {
 return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, 
@Qualifier("ackQueue") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
import com.xiaowu.rabbitmq.constant.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProductController {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @RequestMapping("/ack")
 public String ack(){
 rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", 
"consumer ack test...");
 return "发送成功!";
 }
}

 1.3 消费者

import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constant.ACK_QUEUE)
 public void ListenerQueue(Message message, Channel channel) throws
Exception {
 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());
 //模拟处理失败
 int num = 3/0;
 System.out.println("处理完成");
 }
}

1.4 运行程序 

启动生产者可以从RabbitMQ Web管理界面看到如下:

再启动消费者,控制台输出:

接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....

管理界面:

可以看到消息处理失败但是消息已经从管理界面移除。 

 2.AcknowledgeMode.AUTO

将配置文件修改为:

spring:
 rabbitmq:
 addresses: amqp:
 listener:
 simple:
 acknowledge-mode: auto

再次启动程序,控制台不断输出错误信息:

接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
 
接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
 
接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
 
接收到消息: consumer ack test..., deliveryTag: 4
2024-04-29T17:07:09.254+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
 
从⽇志上可以看出, 当消费者出现异常时, RabbitMQ会不断的重发. 由于异常,多次重试还是失败,消 息没被确认,也无法nack,就⼀直是unacked状态,导致消息积压。

3.AcknowledgeMode.MANUAL

spring:
 rabbitmq:
 addresses: amqp:
 listener:
 simple:
 acknowledge-mode: manual

消费者手动确认逻辑:

import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constant.ACK_QUEUE)
 public void ListenerQueue(Message message, Channel channel) throws
Exception {
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
 //1. 接收消息
 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());
 //2. 处理业务逻辑
 System.out.println("处理业务逻辑");
 //⼿动设置⼀个异常, 来测试异常拒绝机制
// int num = 3/0;
 //3. ⼿动签收
 channel.basicAck(deliveryTag, true);
 } catch (Exception e) {
 //4. 异常了就拒绝签收
 //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 
则直接丢弃
 channel.basicNack(deliveryTag, true,true);
 }
 }
}

 这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0。

异常时拒绝:

import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constant.ACK_QUEUE)
 public void ListenerQueue(Message message, Channel channel) throws
Exception {
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
 //1. 接收消息
 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());
 //2. 处理业务逻辑
 System.out.println("处理业务逻辑");
 //⼿动设置⼀个异常, 来测试异常拒绝机制
 int num = 3/0;
 //3. ⼿动签收
 channel.basicAck(deliveryTag, true);
 } catch (Exception e) {
 //4. 异常了就拒绝签收
 //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 
则直接丢弃
 channel.basicNack(deliveryTag, true,true);
 }
 }
}
运⾏结果: 消费异常时不断重试, deliveryTag 从1递增
控制台日志:
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑

管理页面上unacked也是1:


http://www.kler.cn/a/579357.html

相关文章:

  • 创建Electron35 + vue3 + electron-builder项目,有很过坑,记录过程
  • 大白话react第十八章React 与 WebGL 项目的高级拓展与优化
  • Java常见的并发设计模式
  • Elastic:AI 会开始取代网络安全工作吗?
  • 小程序事件系统 —— 33 事件传参 - data-*自定义数据
  • blender 坐标系 金属度
  • Linux 4.4 内核源码的目录结构及其主要内容的介绍
  • 使用python自动提取文本关键词
  • 【文献阅读】On-Device Language Models: A Comprehensive Review
  • LVGL开发说明
  • YOLOv10改进之MHAF(多分支辅助特征金字塔)
  • 【爬虫】开篇词
  • SSH/HTTP/HTTPS
  • Manus:革新未来的智能助手
  • 用R语言的XML库写一个采集图片的爬虫程序
  • 基于编译器特性浅析C++程序性能优化
  • 没有最好的,只有最合适的:重新认识测试工具的价值
  • 第十五届蓝桥杯----B组cpp----真题解析(小白版本)
  • PostgreSQL、SQL Server和MySQL数据库性能调优与故障排除技术
  • 用向量数据库建立本地知识库