当前位置: 首页 > article >正文

RabbitMq--消息可靠性

12.消息可靠性

1.消息丢失的情况

  1. 生产者向消息代理传递消息的过程中,消息丢失了
  2. 消息代理( RabbitMQ )把消息弄丢了
  3. 消费者把消息弄丢了

image-20250310192901333

那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性

2.生产者的可靠性

1.生产者重连

由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制

application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /blog
    username: CaiXuKun
    password: T1rhFXMGXIOYCoyi
    connection-timeout: 1s # 连接超时时间
    template:
      retry:
        enabled: true # 开启连接超时重试机制
        initial-interval: 1000ms # 连接失败后的初始等待时间
        multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
        max-attempts: 3 # 最大重试次数

注意事项: 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也可以考虑使用异步线程来执行发送消息的代码

2.生产者确认

RabbitMQ 提供了 Publisher ConfirmPublisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况

  • 消息投递到了 MQ,但是路由失败(业务原因),此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功

  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功

  • 其它情况都会返回 NACK,告知生产者消息投递失败

image-20250310194409045

生产者确认机制有关的配置信息( application.yml 文件)

spring:
  rabbitmq:
    publisher-returns: true
    publisher-confirm-type: correlated

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制
  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息
  3. correlated:以异步回调方式的方式返回 MQ 的回执消息

每个 RabbitTemplate 只能配置一个 ReturnCallback

新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 配置回调
        rabbitTemplate.setReturnsCallback((returnedMessage) -> {
            System.out.println("收到消息的return callback, " +
                    "exchange = " + returnedMessage.getExchange() + ", " +
                    "routingKey = " + returnedMessage.getRoutingKey() + ", " +
                    "replyCode = " + returnedMessage.getReplyCode() + ", " +
                    "replyText = " + returnedMessage.getReplyText() + ", " +
                    "message = " + returnedMessage.getMessage());
        });
    }

}

添加一个测试类,测试 ReturnCallback 的效果

@Test
void testConfirmCallback() throws InterruptedException {
    CorrelationData correlationData = new CorrelationData();
    correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {
        if (confirm.isAck()) {
            // 消息发送成功
            System.out.println("消息发送成功,收到ack");
        } else {
            // 消息发送失败
            System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());
        }

        if (throwable != null) {
            // 消息回调失败
            System.err.println("消息回调失败");
        }
    });

    rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);

    // 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果
    Thread.sleep(2000);
}

如何看待和处理生产者的确认信息

  • 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
  • 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
  • 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息

3.消息代理(RabbitMQ)的可靠性

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

  • 一旦 RabbitMQ 宕机,内存中的消息会丢失
  • 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象

**MQ 阻塞:**当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息

1.数据持久化

RabbitMQ 实现数据持久化包括 3 个方面:

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

注意事项:利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)

2. LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)

从 RabbitMQ 的 3.6.0 版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:

  1. 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
  2. 消费者要处理消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改

开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执

  • 在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

    image-20250310200323362

  • 在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

    //注解式创建
    @RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(
            name = "lazy.queue2",
            durable = "true",
            arguments = @Argument(
                    name = "x-queue-mode",
                    value = "lazy"
            )
    ))
    public void listenLazeQueue(String message) {
        System.out.println("消费者收到了 laze.queue2的消息: " + message);
    }
    
    
    //编程式创建
    
    @Bean
    public org.springframework.amqp.core.Queue lazeQueue() {
        return QueueBuilder.durable("lazy.queue1")
                .lazy()
                .build();
    }
    

4.消费者的可靠性

1. 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:

  1. ack:成功处理消息,RabbitMQ 从队列中删除该消息
  2. nack:消息处理失败,RabbitMQ 需要再次投递消息
  3. reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:

  • none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活

  • auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:

    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject

开启消息确认机制,需要在 application.yml 文件中编写相关的配置

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none
2.失败重试机制

当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力

我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队

在 application.yml 配置文件中开启失败重试机制

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true # 开启消息消费失败重试机制
          initial-interval: 1000ms # 消息消费失败后的初始等待时间
          multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
          max-attempts: 3 # 最大重试次数
          stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false

在达到最大重试次数后,消息会丢失!!!怎样解决?

3. 失败消息的处理策略

开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

image-20250310203318152

我们来演示一下使用 RepublishMessageRecoverer 类的情况

  • 第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
    public class ErrorConfiguration {
    
        @Bean
        public DirectExchange errorExchange() {
            return new DirectExchange("error.direct", true, false);
        }
    
        @Bean
        public Queue errorQueue() {
            return new Queue("error.queue", true, false, false);
        }
    
        @Bean
        public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
            return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
        }
    
    }
    
    
  • 第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
    
    
  • 在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列

    image-20250310203613628

  • 在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列

    image-20250310203633853

总结:消费者如何保证消息一定被消费?

  • 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
  • 开启消费者失败重试机制,并设置 MessageRecoverer ,多次重试失败后将消息投递到异常交换机,交由人工处理
4. 业务幂等性

在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的

image-20250310203936288

那么有什么方法能够确保业务的幂等性呢

方案一:为每条消息设置一个唯一的 id

给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:

  1. 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
  3. 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理

可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId

@Bean
public MessageConverter jacksonMessageConvertor() {
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息

image-20250310204104246

但这种方式对业务有一定的侵入性

方案二:结合业务判断 – 结合实例
兜底的解决方案

我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性


http://www.kler.cn/a/580021.html

相关文章:

  • 【由技及道】量子跃迁部署术:docker+jenkins+Harbor+SSH的十一维交付矩阵【人工智障AI2077的开发日志011】
  • word甲烷一键下标
  • 学习笔记11——并发编程之并发关键字
  • DMA在STM32中的应用
  • 如何选择开源向量数据库
  • 6、通过husky规范commit提交信息
  • 软考 数据通信基础——信道
  • 坐落于杭州的电商代运营公司品融电商
  • 服务器租用:静态BGP和动态BGP分别指什么?
  • 脚本学习(1)验证目录自动化生成脚本
  • 人工智能直通车系列14【机器学习基础】(逻辑回归原理逻辑回归模型实现)
  • 【CSS3】元婴篇
  • 常用的分布式 ID 设计方案
  • 【Linux系统】进程状态:一个进程的轮回史
  • 高频算法题精讲(Python解法)——算法+实际场景化拆解
  • Spring Boot+RabbitMQ+Canal 解决数据一致性
  • 【时间序列聚类】Feature-driven Time Series Clustering(特征驱动的时间序列聚类)
  • 为什么大模型网站使用 SSE 而不是 WebSocket?
  • 【JAVA】之路启航——初识Java篇
  • 基于Spring Cloud Alibaba的电商系统微服务化实战:从拆分到高可用部署