MQ 消息发送可靠性保证 —— 整合 Spring Retry 重试框架 + 补偿发送方案
MQ 消息发送可靠性保证 —— 整合 Spring Retry 重试框架 + 补偿发送方案
RocketMQ Starter 本身提供重试机制较为简单,无法指定较复杂的重试策略
复杂的重试策略 RocketMQ 无法直接配置:
- 间隔和延迟策略: RocketMQ 本身的重试机制没有内建对重试间隔和延迟时间的高级控制。例如,你不能简单地配置每次重试的延迟时间和间隔时间,或者实现指数级回退的策略。所有的重试都是在一个固定的时间内进行的,缺少对每次重试间隔的控制。
- 定制化重试规则: 如果你想要一个更复杂的重试规则(如重试间隔时间逐步增加、使用不同的间隔策略等),RocketMQ 的默认重试机制就比较难以满足。这种情况下,你需要自定义重试逻辑,比如使用 Spring Retry 来实现更复杂的策略。
Spring Retry 是一个用于为应用程序提供自动重试功能的框架,特别适用于执行可能会因暂时性问题失败的操作(如网络请求、数据库操作、消息队列操作等)。通过配置,Spring Retry 能够在失败时自动重试指定次数,且每次重试可以配置不同的延迟和间隔。
为什么使用 Spring Retry:
Spring Retry 的优势在于它能够提供比 RocketMQ 更细粒度的控制。你可以使用 Spring Retry 来设置如下复杂的策略:
- 自定义重试次数: 你可以灵活设置最大重试次数。
- 延迟策略: 可以配置不同的延迟时间,支持指数回退、固定间隔、随机延迟等。
- 重试间隔的乘法: 支持每次重试间隔成倍增加的策略(如 2x、3x 等)。
- 失败回调: 可以定义失败后执行的回调,如写入数据库等操作。
因此,RocketMQ 的内建重试机制在某些特定场景下,尤其是需要复杂间隔、延迟或其他高级控制时,可能不如 Spring Retry 这么灵活。在这种情况下,结合 Spring Retry 进行二次封装,能够提供更强大、更灵活的重试控制。
1.添加依赖
<!-- Spring Retry 重试框架 -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<!-- AOP 切面(Spring Retry 重试框架需要) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
2.启用retry
启动类上加上@EnableRetry
3.重试配置
Retry-yaml
retry:
max-attempts: 3 # 最大重试次数
init-interval: 1000 # 初始延迟时间,单位 ms
multiplier: 2 # 每次重试间隔加倍(每次乘以 2)
RetryProperties
@ConfigurationProperties(prefix = RetryProperties.PREFIX)
@Component
@Data
public class RetryProperties {
public static final String PREFIX = "retry";
/**
* 最大重试次数
*/
private Integer maxAttempts = 3;
/**
* 初始间隔时间,单位 ms
*/
private Integer initInterval = 1000;
/**
* 乘积(每次乘以 2)
*/
private Double multiplier = 2.0;
}
RetryTemplate
@Configuration
public class RetryConfig {
@Resource
private RetryProperties retryProperties;
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 定义重试策略(最多重试 3 次)
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts()); // 最大重试次数
// 定义间隔策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔 2000ms
backOffPolicy.setMultiplier(retryProperties.getMultiplier()); // 每次乘以 2
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}
4.配置线程池
@Configuration
public class ThreadPoolConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 最大线程数
executor.setMaxPoolSize(50);
// 队列容量
executor.setQueueCapacity(200);
// 线程活跃时间(秒)
executor.setKeepAliveSeconds(30);
// 线程名前缀
executor.setThreadNamePrefix("NoteExecutor-");
// 拒绝策略:由调用线程处理(一般为主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
5.配置mq异步发送工具类
@Component
@Slf4j
public class SendMqRetryHelper {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private RetryTemplate retryTemplate;
@Resource(name = "taskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* 异步发送 MQ
* @param topic
*/
public void asyncSend(String topic, String body) {
log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body);
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
Message<String> message = MessageBuilder.withPayload(body)
.build();
// 异步发送 MQ 消息,提升接口响应速度
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
handleRetry(topic, message);
}
});
}
/**
* 重试处理
* @param topic
* @param message
*/
private void handleRetry(String topic, Message<String> message) {
// 异步处理
threadPoolTaskExecutor.submit(() -> {
try {
// 通过 retryTemplate 执行重试
retryTemplate.execute((RetryCallback<Void, RuntimeException>) context -> {
log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now());
// 同步发送 MQ
rocketMQTemplate.syncSend(topic, message);
return null;
});
} catch (Exception e) {
// 多次重试失败,进入兜底方案
fallback(e, topic, message.getPayload());
}
});
}
/**
* 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除
*/
private void fallback(Exception e, String topic, String bodyJson) {
log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson);
// TODO:
}
}
首先,RocketMQ 会异步发送消息并进行重试(取决于你的配置)。
如果 RocketMQ 异步发送失败并且重试 3 次后依然失败,onException
方法被调用。
在 onException
中,handleRetry
方法会被触发,该方法会调用 retryTemplate.execute(...)
来进行同步重试(取决于你的配置)。
如果所有重试失败,则会调用 fallback
方法进行兜底处理。
通常的话,这里mq的重试就配置成0了因为我们已经自己封装了重试机制
6.服务层使用
// 发送 MQ (包含重试机制)
sendMqRetryHelper.asyncSend(MQConstants.TOPIC, JsonUtils.toJsonString(MqDTO));