当前位置: 首页 > article >正文

MQ 消息发送可靠性保证 —— 整合 Spring Retry 重试框架 + 补偿发送方案

MQ 消息发送可靠性保证 —— 整合 Spring Retry 重试框架 + 补偿发送方案

RocketMQ Starter 本身提供重试机制较为简单,无法指定较复杂的重试策略

复杂的重试策略 RocketMQ 无法直接配置:

  1. 间隔和延迟策略: RocketMQ 本身的重试机制没有内建对重试间隔和延迟时间的高级控制。例如,你不能简单地配置每次重试的延迟时间和间隔时间,或者实现指数级回退的策略。所有的重试都是在一个固定的时间内进行的,缺少对每次重试间隔的控制。
  2. 定制化重试规则: 如果你想要一个更复杂的重试规则(如重试间隔时间逐步增加、使用不同的间隔策略等),RocketMQ 的默认重试机制就比较难以满足。这种情况下,你需要自定义重试逻辑,比如使用 Spring Retry 来实现更复杂的策略。

Spring Retry 是一个用于为应用程序提供自动重试功能的框架,特别适用于执行可能会因暂时性问题失败的操作(如网络请求、数据库操作、消息队列操作等)。通过配置,Spring Retry 能够在失败时自动重试指定次数,且每次重试可以配置不同的延迟和间隔。

为什么使用 Spring Retry:

Spring Retry 的优势在于它能够提供比 RocketMQ 更细粒度的控制。你可以使用 Spring Retry 来设置如下复杂的策略:

  • 自定义重试次数: 你可以灵活设置最大重试次数。
  • 延迟策略: 可以配置不同的延迟时间,支持指数回退、固定间隔、随机延迟等。
  • 重试间隔的乘法: 支持每次重试间隔成倍增加的策略(如 2x、3x 等)。
  • 失败回调: 可以定义失败后执行的回调,如写入数据库等操作。

因此,RocketMQ 的内建重试机制在某些特定场景下,尤其是需要复杂间隔、延迟或其他高级控制时,可能不如 Spring Retry 这么灵活。在这种情况下,结合 Spring Retry 进行二次封装,能够提供更强大、更灵活的重试控制。

1.添加依赖

     <!-- Spring Retry 重试框架  -->
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
        </dependency>
        
        <!-- AOP 切面(Spring Retry 重试框架需要) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

2.启用retry

启动类上加上@EnableRetry

3.重试配置

Retry-yaml

retry:
  max-attempts: 3 # 最大重试次数
  init-interval: 1000 # 初始延迟时间,单位 ms
  multiplier: 2 # 每次重试间隔加倍(每次乘以 2)

RetryProperties

@ConfigurationProperties(prefix = RetryProperties.PREFIX)
@Component
@Data
public class RetryProperties {

    public static final String PREFIX = "retry";

    /**
     * 最大重试次数
     */
    private Integer maxAttempts = 3;

    /**
     * 初始间隔时间,单位 ms
     */
    private Integer initInterval = 1000;

    /**
     * 乘积(每次乘以 2)
     */
    private Double multiplier = 2.0;

}

RetryTemplate

@Configuration
public class RetryConfig {

    @Resource
    private RetryProperties retryProperties;

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // 定义重试策略(最多重试 3 次)
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts()); // 最大重试次数

        // 定义间隔策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔 2000ms
        backOffPolicy.setMultiplier(retryProperties.getMultiplier());       // 每次乘以 2

        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        return retryTemplate;
    }
}

4.配置线程池

@Configuration
public class ThreadPoolConfig {

    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(10);
        // 最大线程数
        executor.setMaxPoolSize(50);
        // 队列容量
        executor.setQueueCapacity(200);
        // 线程活跃时间(秒)
        executor.setKeepAliveSeconds(30);
        // 线程名前缀
        executor.setThreadNamePrefix("NoteExecutor-");

        // 拒绝策略:由调用线程处理(一般为主线程)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
        executor.setAwaitTerminationSeconds(60);

        executor.initialize();
        return executor;
    }
}

5.配置mq异步发送工具类

@Component
@Slf4j
public class SendMqRetryHelper {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Resource
    private RetryTemplate retryTemplate;
    @Resource(name = "taskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    /**
     * 异步发送 MQ
     * @param topic
     */
    public void asyncSend(String topic, String body) {
        log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body);

        // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
        Message<String> message = MessageBuilder.withPayload(body)
                .build();

        // 异步发送 MQ 消息,提升接口响应速度
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("==> 【评论发布】MQ 发送异常: ", throwable);
                handleRetry(topic, message);
            }
        });
    }

    /**
     * 重试处理
     * @param topic
     * @param message
     */
    private void handleRetry(String topic, Message<String> message) {
        // 异步处理
        threadPoolTaskExecutor.submit(() -> {
            try {
                // 通过 retryTemplate 执行重试
                retryTemplate.execute((RetryCallback<Void, RuntimeException>) context -> {
                    log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now());
                    // 同步发送 MQ
                    rocketMQTemplate.syncSend(topic, message);
                    return null;
                });
            } catch (Exception e) {
                // 多次重试失败,进入兜底方案
                fallback(e, topic, message.getPayload());
            }
        });
    }

    /**
     * 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除
     */
    private void fallback(Exception e, String topic, String bodyJson) {
        log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson);

        // TODO:
    }
}

首先,RocketMQ 会异步发送消息并进行重试(取决于你的配置)。

如果 RocketMQ 异步发送失败并且重试 3 次后依然失败,onException 方法被调用。

onException 中,handleRetry 方法会被触发,该方法会调用 retryTemplate.execute(...) 来进行同步重试(取决于你的配置)。

如果所有重试失败,则会调用 fallback 方法进行兜底处理。

通常的话,这里mq的重试就配置成0了因为我们已经自己封装了重试机制

6.服务层使用

// 发送 MQ (包含重试机制)
        sendMqRetryHelper.asyncSend(MQConstants.TOPIC, JsonUtils.toJsonString(MqDTO));

http://www.kler.cn/a/578089.html

相关文章:

  • 【网络协议详解】——路由策略技术(学习笔记)
  • Apache Kafka 在生产环境中的管理与优化:从理论到实践
  • manus本地部署使用体验
  • 使用 Java 执行 SQL 语句和存储过程
  • [含文档+PPT+源码等]精品基于Python实现的校园小助手小程序的设计与实现
  • Java面试第九山!《SpringBoot框架》
  • Golang:实时消息交互系统
  • 物联网中 对设备监测和设备控制
  • C语言学习笔记-进阶(7)字符串函数3
  • 树莓派学习(一)——3B+环境配置与多用户管理及编程实践
  • SQL注入的原理及详细运用
  • 在 Docker 中搭建GBase 8s主备集群环境
  • Banana Pi OpenWRT One Wifi6 OpenWrt社区官方开源路由器评测
  • mysql忘记初始临时密码解决方法
  • 夏门大学DeepSeek 手册:从社会大众到高校及企业的全面应用实践研究(附 PDF 下载)
  • 2025年渗透测试面试题总结-长某亭科技-安全服务工程师(二面) (题目+回答)
  • react任务调度(简单版)和最小堆算法
  • Leetcode 62: 不同路径
  • 掌握Kubernetes Network Policy,构建安全的容器网络
  • 蓝桥备赛(13)- 链表和 list(下)