当前位置: 首页 > article >正文

RabbitMQ--发送方确认及消息重试

(一)发送方确认

  之前我们在七大工作模式中简单说了发送方确认,就是生产者到RabbitMQ这一过程中,消息是否正确到达服务器,生产者要进行确认的过程

 一共有两种确认模式

1.confirm确认模式

  是生产者到交换机的阶段,生产者进行确认的过程,之后还有return模式,是交换机到消息队列,确认的过程

 生产者发送消息的时候,对发送端设置一个ConfirmCallback监听,无论消息是否到达交换机,这个监听都会执行,如果交换机收到了,就会ACK就会为true否则为false

然后我们来看代码

首先还是要更改配置文件

spring:
  rabbitmq:
    addresses: amqp://student:student@62.234.46.219:5672/test
    listener:
      simple:
#        acknowledge-mode: NONE
#       acknowledge-mode: AUTO
        acknowledge-mode: MANUAL
    publisher-confirm-type: correlated  #消息发送确认

然后声明交换机和队列

 

    @Bean("confirmExchange")
    public Exchange confirmExchange(){
        return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();
    }
    @Bean("confirmQueue")
    public Queue comfirmQueue(){
        return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
    }
    @Bean("confirmBind")
    public Binding confirmBind(@Qualifier("confirmExchange") Exchange confrimExchange,@Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(confrimExchange).with("confirm").noargs();
    }

然后我们来看生产者代码(有一点问题,错误示范)

public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
@RequestMapping("/confirm")
    public String confirmRabbit(){
//        RabbitTemplate rabbitTemplate1=new RabbitTemplate();
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            //回调方法
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("执行回调方法");
                if(ack){
                    System.out.println("交换机成功接收到消息 id: "+correlationData.getId());
                }else {
                    System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);
                    //业务处理
                }
            }
        });
        CorrelationData correlationData=new CorrelationData("1");
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm模式",correlationData);
        return "发送成功";
    }
}

之所以说是错误示范,我们可以先执行一下试试

 我们发现也没什么错误啊,我们再来发送一遍

 

这时就会报错了,告诉我们一个RabbitTemplate只能支持一个回调方法

  之所以会报这个错,是因为我们每次调用接口,都会给这个RabbitTemplate再设置一遍回调方法就会出错,

  还有一个问题就是,我们既然是针对RabbitTemplate来设置的,那么所有使用此RabbitTemplate的接口,都会被影响,所以我们应该把他设置为多例的

 那接下来我们就来看看怎么设置多个

@Configuration
public class Rabbit {
    @Bean
     public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
         RabbitTemplate confirmRabbitTemplate =new RabbitTemplate(connectionFactory);
         confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
             //回调方法
             @Override
             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                 System.out.println("执行回调方法");
                 if(ack){
                     System.out.println("交换机成功接收到消息 id: "+correlationData.getId());
                 }else {
                     System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);
                     //业务处理
                 }
             }
         });
         return confirmRabbitTemplate;
     }
}

上面 的ConnectionFactory是我们配置文件写好后,自动给我们创建的,用来创建RabbitTemplate

那接下来我们使用这个新创建的ConfirmRabbitTemplate来测试下是否能解决我们的问题

我多次访问了接口,没有问题,那我们接口不能重复访问的问题就解决了,那能不能解决我们不同接口,被同时影响的问题?

我们发现还是不可以的,这是因为我们本身注入的是Spring框架给我们提供的,我们如果写了一个RabbitTemplate之后,spring就不会再给我们提供了,就会导致Autowired注解,通过类型查找,只能找到一个RabbitTemplate,就使用了,如果我们想解决这个问题,就需要我们自己再另外创建一个

  @Bean
    public RabbitTemplate RabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate RabbitTemplate =new RabbitTemplate(connectionFactory);
        return RabbitTemplate;
    }

这样我们再来重新试一下 

此时我们第二种问题也正确了 

此时我们再来看,如果我们故意放一个错误的routingkey

我们发现还是执行正常,但是我们的交换机根本映射不到队列中,还是会造成消息丢失,此时该如何处理,这是就轮到我们的return模式了

 2.return退回模式

  消息到达交换机,根据路由规则匹配并把消息放到队列中,在这个过程中,如果一个队列都无法匹配成功,就可以通过此模式把消息退回给生产者

  代码跟confirm模式很像,而且同样也会有上面两种问题,所以这里不再说了,直接上代码

 @Bean
     public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
         RabbitTemplate confirmRabbitTemplate =new RabbitTemplate(connectionFactory);
         confirmRabbitTemplate.setMandatory(true);
         //也是一种回调函数
         confirmRabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
             @Override
             public void returnedMessage(ReturnedMessage returned) {
                 System.out.println(returned.getMessage().getBody()+"被退回");
             }
         });
         confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
             //回调方法
             @Override
             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                 System.out.println("执行回调方法");
                 if(ack){
                     System.out.println("交换机成功接收到消息 id: "+correlationData.getId());
                 }else {
                     System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);
                     //业务处理
                 }
             }
         });
         return confirmRabbitTemplate;
     }

记得我们要设置SetMandory为(true)否则不执行 

3.如何保证RabbitMQ的消息可靠传输(面试题)

  我们一步步说

1)首先是生产者到交换机,此时可能会因为网络问题,导致生产者的消息发送到交换机失败,此时我们可以通过发送方确认中的confirm模式来解决

2)交换机无法找到对应队列,我们可以通过发送方确认中的return模式来处理返回消息

3)RabbitMQ服务宕机了,我们可以设置消息的持久化,队列持久化,交换机持久化,来保存消息到硬盘中,但是可能会有一些消息在缓冲区中还没有写入硬盘(这些消息就会丢失)

4)RabbitMQ到消费者过程中,可能因为消费者代码或者网络问题,造成消息丢失,此时可以用消息确认来解决(自动确认,手动确认)

(二)重试机制

 在消息传递过程中,会遇到一些问题,可能会导致消费者消费消息的时候,处理失败,那对此RabbitMQ给我们提供了自动确认和手动确认,同时也给我们提供了重试机制,允许在消息处理失败后,重新发送

  重试机制,在自动确认和手动确认的时候是没用的,所以我们需要设置为auto,当消息正常处理时就自动确认,当抛出异常时则不会确认消息

  代码如下:

首先我们还是要更改配置

spring:
  rabbitmq:
    addresses: amqp://student:student@62.234.46.219:5672/test
    listener:
      simple:
#        acknowledge-mode: NONE
        acknowledge-mode: AUTO
#        acknowledge-mode: MANUAL
        retry:
          enabled: true # 开启消费者失败重试 
          initial-interval: 5000ms # 初始失败等待时⻓为5秒 
          max-attempts: 5
    publisher-confirm-type: correlated  #消息发送确认

   我们看到一共发送了五条,那我们发现这里的ID都是一样的,因为他们还是同一条消息, 如果我们是手动确认时出现了问题重新发送,他是先重新入队列再发送,他的ID是会递增的,而重试则不会

  但是我们要注意,此时我们是没有捕获异常的,就让他往上抛才会触发重试,如果我们捕获了,就不会触发重试机制  同时我们刚刚配置最多是发五次,我们在发送时是unack状态

如果我们五次后还没有正确接收到消息,消息会自动确认就会丢失

 


http://www.kler.cn/a/508630.html

相关文章:

  • XML在线格式化 - 加菲工具
  • 构建一个简单的深度学习模型
  • OpenCV相机标定与3D重建(55)通用解决 PnP 问题函数solvePnPGeneric()的使用
  • cursor重构谷粒商城02——30分钟构建图书管理系统【cursor使用教程番外篇】
  • Maven 配置本地仓库
  • 在VS2022中用C++连接MySQL数据库读取数据库乱码问题
  • 数仓建模(三)建模三步走:需求分析、模型设计与数据加载
  • (二)异步处理机制(Asynchronous Processing)
  • Spring Boot 中logback无法对warn警告日志发送邮件
  • 使用SIPP发起媒体流性能测试详解
  • PyBroker:利用 Python 和机器学习助力算法交易
  • 自动驾驶占用网格预测
  • Ruby JSON 优化之路:性能提升的探索与实践
  • 文档智能:OCR+Rocketqa+layoutxlm <Rocketqa>
  • 【Kotlin】上手学习之控制流程篇
  • ReaderLM v2:HTML 转 Markdown 和 JSON 的前沿小型语言模型
  • 常见安全风险及防护(如CSRF,XSS) 配置SSL/TLS
  • 分类统计字符个数(PTA)C语言
  • mysql主从复制sql进程中断,报错Tablespace is missing for table ……
  • Vue 3 中的 defineExpose
  • C语言之字符函数和字符串函数(上)
  • Vue3实现表格搜索内容高亮
  • Kotlin Bytedeco OpenCV 图像图像57 图像ROI
  • BUUCTF Web
  • 哪些新兴技术对智能驾驶汽车影响最大?
  • Neo4j与Python交互