基于本地事务表+MQ实现分布式事务
基于本地事务表+MQ实现分布式事务
- 引言
- 1、原理
- 2、本地消息表优缺点
- 3、本地启动rocketmq
- 4、代码实现及验证
- 4.1、核心代码
- 4.2、代码执行流程
- 4.3、项目结构
- 4.4、项目源码
引言
本地消息表的方案最初由ebay的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理。本地消息表实现最终一致性。本文主要学习mallchat开源项目中的实现方案,向mallchat开发团队致敬!
1、原理
分布式事务解决方案有许多比如二阶段提交、TCC、最大努力通知、Saga事务等,本文介绍本地消息表+MQ这种方式解决分布式事务消息最总一致性问题。目前利用本地消息表+MQ方案实现最终消息一致性的比较多,它的核心思想是,将分布式事务拆分成本地事务进行处理,不同事务之间通过消息表和MQ通信,最后通过定时任务扫描失败的数据进行重试,当在有效重试次数限制内,再次重试回调失败的数据,最终实现消息重复发送,达到一致性。
2、本地消息表优缺点
本地消息表实现了分布式事务的最终一致性,优缺点比较明显。
优点
1.实现逻辑简单,开发成本比较低
缺点
1.与业务场景绑定,高耦合,不可公用
2.本地消息表与业务数据表在同一个库,占用业务系统资源,量大可能会影响数据库性能
3、本地启动rocketmq
首先本地启动rocketmq服务。
1:在D:\rocketmq-all-5.3.1-bin-release\bin目录下,执行cmd命令
启动服务
start mqnamesrv.cmd
启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
关闭服务
mqshutdown namesrv
关闭broker
mqshutdown broker
2:启动rocketmq-dashboard项目,输入127.0.0.1:8080进入可视化界面
其中rocketmq-dashboard可视化代码参考windows下安装启动rocketmq可视化界面
将rocketmq-dashboard导入到idea中,在idea编译启动。
4、代码实现及验证
4.1、核心代码
(1)注解SecureInvoke
/**
* 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。
*
* @author hauhua
* @DATE 2025/1/21
*/
@Retention(RetentionPolicy.RUNTIME)//运行时生效
@Target(ElementType.METHOD)//作用在方法上
public @interface SecureInvoke {
/**
* 默认3次
*
* @return 最大重试次数(包括第一次正常执行)
*/
int maxRetryTimes() default 3;
/**
* 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。
* 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。
*
* @return 是否异步执行
*/
boolean async() default true;
}
(2)自定义切面SecureInvokeAspect
/**
* 安全执行切面
*
* @author hauhua
* @DATE 2025/1/21
*/
@Slf4j
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行
@Component
public class SecureInvokeAspect {
@Autowired
private SecureInvokeService secureInvokeService;
@Around("@annotation(secureInvoke)")
public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable {
boolean async = secureInvoke.async();
boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
// 非事务状态,直接执行,不做任何保证。
if (!inTransaction) {
return joinPoint.proceed();
}
// 如果是事务状态,保证保存到本地消息表中的数据和保存在数据库中数据,在同一个事务内
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
List<String> parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList());
SecureInvokeVo dto = SecureInvokeVo.builder()
.args(JsonUtils.toStr(joinPoint.getArgs()))
.className(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(JsonUtils.toStr(parameters))
.build();
SecureInvokeRecord record = SecureInvokeRecord.builder()
.secureInvokeVo(dto)
.maxRetryTimes(secureInvoke.maxRetryTimes())
.nextRetryTime(DateUtil.offsetMinute(new Date(), (int) SecureInvokeService.RETRY_INTERVAL_MINUTES))
.createTime(new Date())
.updateTime(new Date())
.build();
secureInvokeService.invoke(record, async);
return null;
}
}
(3)控制层MessageController
/**
* 消息管理
*
* @author hauhua
* @DATE 2025/1/21
**/
@RestController
public class MessageController {
@Autowired
private MQProducerService mqProducerService;
@RequestMapping("sendMsg")
@Transactional
public void sendMsg(@RequestParam String topic, @RequestBody Object body) {
mqProducerService.sendSecureMsg(topic, body,"test");
}
}
(4)消息Service层MQProducerService
/**
* 发送mq工具类
*
* @author hauhua
* @DATE 2025/1/21
*/
@Slf4j
public class MQProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMsg(String topic, Object body) {
Message<Object> build = MessageBuilder.withPayload(body).build();
rocketMQTemplate.send(topic, build);
}
/**
* 发送可靠消息,在事务提交后保证发送成功
*
* @param topic
* @param body
*/
@SecureInvoke
public void sendSecureMsg(String topic, Object body, Object key) {
// 通过切面实现本地消息表,因为要在此方法执行前将数据保存到本地消息表,进行功能增强
// 模拟发送mq消息有1/3概率失败
if(RandomUtil.randomInt(3) >= 1){
log.info("sendSecureMsg fail");
throw new IllegalStateException();
}
Message<Object> build = MessageBuilder
.withPayload(body)
.setHeader("KEYS", key)
.build();
log.info("sendSecureMsg start");
rocketMQTemplate.send(topic, build);
log.info("sendSecureMsg end");
}
}
(5)安全执行处理器SecureInvokeService
@Slf4j
@AllArgsConstructor
public class SecureInvokeService {
public static final double RETRY_INTERVAL_MINUTES = 2D;
private final SecureInvokeRecordDao secureInvokeRecordDao;
private final Executor executor;
// 每5秒执行一次定时任务,从数据库中获取所有等待重试的记录,并异步调用这些记录对应的方法
@Scheduled(cron = "*/5 * * * * ?")
public void retry() {
List<SecureInvokeRecord> secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords();
if (secureInvokeRecords.isEmpty()) {
log.info("SecureInvokeService retry no record");
return;
}
for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) {
doAsyncInvoke(secureInvokeRecord);
}
}
public void save(SecureInvokeRecord record) {
secureInvokeRecordDao.save(record);
}
private void retryRecord(SecureInvokeRecord record, String errorMsg) {
Integer retryTimes = record.getRetryTimes() + 1;
SecureInvokeRecord update = new SecureInvokeRecord();
update.setId(record.getId());
update.setFailReason(errorMsg);
update.setNextRetryTime(getNextRetryTime(retryTimes));
update.setUpdateTime(new Date());
if (retryTimes > record.getMaxRetryTimes()) {
update.setStatus(SecureInvokeRecord.STATUS_FAIL);
} else {
update.setRetryTimes(retryTimes);
}
secureInvokeRecordDao.updateById(update);
}
private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法
double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16m
return DateUtil.offsetMinute(new Date(), (int) waitMinutes);
}
private void removeRecord(Long id) {
secureInvokeRecordDao.removeById(id);
}
public void invoke(SecureInvokeRecord record, boolean async) {
boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
//非事务状态,直接执行,不做任何保证。
if (!inTransaction) {
return;
}
//保存执行数据
record.setStatus(SecureInvokeRecord.STATUS_WAIT);
save(record);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@SneakyThrows
@Override
public void afterCommit() {
//事务后执行
if (async) {
doAsyncInvoke(record);
} else {
doInvoke(record);
}
}
});
}
public void doAsyncInvoke(SecureInvokeRecord record) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
doInvoke(record);
});
}
public void doInvoke(SecureInvokeRecord record) {
SecureInvokeVo secureInvokeVo = record.getSecureInvokeVo();
try {
// 构造method以及对应的类和method中的args参数
SecureInvokeHolder.setInvoking();
Class<?> beanClass = Class.forName(secureInvokeVo.getClassName());
Object bean = SpringUtil.getBean(beanClass);
List<String> parameterStrings = JsonUtils.toList(secureInvokeVo.getParameterTypes(), String.class);
List<Class<?>> parameterClasses = getParameters(parameterStrings);
Method method = ReflectUtil.getMethod(beanClass, secureInvokeVo.getMethodName(), parameterClasses.toArray(new Class[]{}));
Object[] args = getArgs(secureInvokeVo, parameterClasses);
// 执行方法,回调自己在事务提交后的逻辑
method.invoke(bean, args);
// 执行成功则删除secure_invoke_record对应的数据,说明消息已经发送成功
removeRecord(record.getId());
} catch (Throwable e) {
log.error("SecureInvokeService invoke fail", e);
// 执行失败,等待下次执行(用于重试,下一次执行时间会有对应的算法)
retryRecord(record, e.getMessage());
} finally {
SecureInvokeHolder.invoked();
}
}
@NotNull
private Object[] getArgs(SecureInvokeVo secureInvokeVo, List<Class<?>> parameterClasses) {
JsonNode jsonNode = JsonUtils.toJsonNode(secureInvokeVo.getArgs());
Object[] args = new Object[jsonNode.size()];
for (int i = 0; i < jsonNode.size(); i++) {
Class<?> aClass = parameterClasses.get(i);
args[i] = JsonUtils.nodeToValue(jsonNode.get(i), aClass);
}
return args;
}
@NotNull
private List<Class<?>> getParameters(List<String> parameterStrings) {
return parameterStrings.stream().map(name -> {
try {
return Class.forName(name);
} catch (ClassNotFoundException e) {
log.error("SecureInvokeService class not fund", e);
}
return null;
}).collect(Collectors.toList());
}
}
4.2、代码执行流程
由于在代码中写死了模拟1/3概率发送消息失败,所以刚开始会将数据插入secure_invoke_record,然后通过TransactionSynchronizationManager执行后置处理,最后通过定时任务,扫描失败的任务,从secure_invoke_record取出数据利用MQ再次重试,并且删除secure_invoke_record表刚刚重试的数据,最后执行完成,达到消息最终一致性。
(1)数据库层面执行验证效果
刚开始1/3概率发送消息失败,通过切面将消息保存到本地消息表secure_invoke_record中。
可以看出创建时间为00:25,下一次重试时间为00:27,到了00:27定时任务启动,执行重试,成功后把这个本地消息删除,并重发MQ
自定义的topic也已经经过重试发送消息成功
(2)本地idea控制台日志层面分析
在时间为00:25发送失败,写入本地消息表。在重试时间为00:27,发送MQ消息成功。
4.3、项目结构
主要提交代码如下
提交MR地址
4.4、项目源码
项目源码demo-springboot-mybatisplus,欢迎大家star!