当前位置: 首页 > article >正文

RabbitMQ高级特性--重试特性

目录

1.重试配置

2.配置交换机&队列

3.发送消息

4.消费消息

5. 运行程序观察结果

6. 手动确认

 注意:


在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误, 那么多次重试也是没有用的, 可以设置重试次数

1.重试配置

spring:
  rabbitmq:
    addresses: amqp://study:study@你的服务器IP:15673/你的虚拟机名
    listener:
      simple:
        acknowledge-mode: auto #消息接收确认
      retry:
        enabled: true # 开启消费者失败重试
        initial-interval: 5000ms # 初始失败等待时⻓为5秒
        max-attempts: 5 # 最⼤重试次数(包括自身消费的一次)

2.配置交换机&队列

//重试机制
public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";


/重试机制 发布订阅模式
//1. 交换机
@Bean("retryExchange")
public Exchange retryExchange() {
 return
ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
//2. 队列
@Bean("retryQueue")
public Queue retryQueue() {
 return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange 
exchange, @Qualifier("retryQueue") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange);
}

3.发送消息

@RequestMapping("/retry")
public String retry(){
 rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry 
test...");
 return "发送成功!";
}

4.消费消息

@Component
public class RetryQueueListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constant.RETRY_QUEUE)
 public void ListenerQueue(Message message) throws Exception {
 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());
 //模拟处理失败
 try {
 int num = 3/0;
 System.out.println("处理完成");
}catch (Exception e){
 System.out.println("处理失败");
}
 }
}

5. 运行程序观察结果

接收到消息: consumer ack test..., deliveryTag: 1
处理失败

6. 手动确认

@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());
 //模拟处理失败
 int num = 3/0;
 System.out.println("处理完成");
 //3. ⼿动签收
 channel.basicAck(deliveryTag, true);
 }catch (Exception e){
 //4. 异常了就拒绝签收
 Thread.sleep(1000);
 //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接
丢弃
 channel.basicNack(deliveryTag, true,true);
 }
}

运行结果:

接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 2
接收到消息: retry test..., deliveryTag: 3
接收到消息: retry test..., deliveryTag: 4
接收到消息: retry test..., deliveryTag: 5
接收到消息: retry test..., deliveryTag: 6
接收到消息: retry test..., deliveryTag: 7
接收到消息: retry test..., deliveryTag: 8
接收到消息: retry test..., deliveryTag: 9
接收到消息: retry test..., deliveryTag: 10
接收到消息: retry test..., deliveryTag: 11

可以看到, 手动确认模式时, 重试次数的限制不会像在自动确认模式下那样直接生效, 因为是否重试以及何时重试更多地取决于应用程序的逻辑和消费者的实现.

自动确认模式下, RabbitMQ 会在消息被投递给消费者后自动确认消息. 如果消费者处理消息时抛出异常, RabbitMQ 根据配置的重试参数自动将消息重新⼊队, 从而实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略.

手动确认模式下, 消费者需要显式地对消息进行确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新⼊队. 重试的控制权在于应用程序本身, 而不是RabbitMQ的内部机制. 应用程序可以通过自己的逻辑和利用RabbitMQ的高级特性来实现有效的重试策略

 注意:

1. 动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失
2. 手 动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是
unacked的状态, 导致消息积压


http://www.kler.cn/a/614417.html

相关文章:

  • 28_跨域
  • 并发编程模型
  • QT登录注册模块(客户端)
  • [特殊字符]《多商户家政系统技术解析:SpringBoot+MyBatisPlus+UniApp高效实战指南》
  • 力扣刷题494. 目标和
  • Linux环境下安装部署Docker
  • (二)创建实例
  • 去噪算法大比拼
  • Vite任意文件读取漏洞:CVE-2025-30208
  • DeepSeek-R1国产大模型实战:从私有化部署到内网穿透远程使用全攻略
  • HCIP之VRRP
  • ETL与数据迁移:从传统系统到云平台的平稳过渡全攻略
  • 【磁盘扩容】linux磁盘扩容
  • 【QA】QT有哪些状态模式的设计?
  • 最常使用的现代C++新特性介绍
  • 面试经典150题·LeetCode26·删除有序数组中的重复项·Java
  • C++中常见符合RAII思想的设计有哪些
  • leetcode 2360 图中最长的环 题解
  • 时序数据库 InfluxDB(一)
  • VMware虚拟机 ubuntu22.04无法与共享粘贴板和拖拽文件的解决方案