当前位置: 首页 > article >正文

【RabbitMQ】04-发送者可靠性

在这里插入图片描述

1. 生产者重试机制

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

2. 生产者确认机制

在这里插入图片描述

1. 配置

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执
spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

2.定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

3.定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

http://www.kler.cn/a/389640.html

相关文章:

  • qt QVideoWidget详解
  • Kubernetes在容器编排中的应用
  • python实战(八)——情感识别(多分类)
  • LeetCode【0014】最长公共前缀
  • 外星人入侵
  • Rust学习(二):rust基础语法Ⅰ
  • Spark中给读取到的数据 的列 重命名的几种方式!
  • 如何使用 Web Scraper API 高效采集 Facebook 用户帖子信息
  • 跨域及解决跨域
  • 使用腾讯地图的 IP 定位服务。这里是正确的实现方式
  • 字节青训-游戏排名第三大的分数、补给站最优花费问题
  • vite-plugin-electron 库作用
  • 细说STM32单片机USART中断收发RTC实时时间并改善其鲁棒性的另一种方法
  • 5G NR:各物理信道的DMRS配置
  • 【划分型 DP-最优划分】力扣2707. 字符串中的额外字符
  • 解决程序因缺少xinput1_3.dll无法运行的有效方法,有效修复丢失xinput1_3.dll
  • WPF的<ContentControl>控件
  • 常用的损失函数pytorch实现
  • 批量清除Word Excel PPT文件打开密码
  • 让redis一直开启服务/自动启动
  • wordpress站外调用指定ID分类下的推荐内容
  • i2c-tools 4.3 for Android 9.0
  • stm32 ADC实例解析(3)-多通道采集互相干扰的问题
  • PySimpleGUI库和pymysql库
  • 探索计算机互联网的奇妙世界:从基础到前沿的无尽之旅
  • 2024 年 Java 面试正确姿势(1000+ 面试题附答案解析)