当前位置: 首页 > article >正文

基于本地事务表+MQ实现分布式事务

基于本地事务表+MQ实现分布式事务

  • 引言
    • 1、原理
    • 2、本地消息表优缺点
    • 3、本地启动rocketmq
    • 4、代码实现及验证
      • 4.1、核心代码
      • 4.2、代码执行流程
      • 4.3、项目结构
      • 4.4、项目源码

引言

本地消息表的方案最初由ebay的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理。本地消息表实现最终一致性。本文主要学习mallchat开源项目中的实现方案,向mallchat开发团队致敬!

1、原理

分布式事务解决方案有许多比如二阶段提交、TCC、最大努力通知、Saga事务等,本文介绍本地消息表+MQ这种方式解决分布式事务消息最总一致性问题。目前利用本地消息表+MQ方案实现最终消息一致性的比较多,它的核心思想是,将分布式事务拆分成本地事务进行处理,不同事务之间通过消息表和MQ通信,最后通过定时任务扫描失败的数据进行重试,当在有效重试次数限制内,再次重试回调失败的数据,最终实现消息重复发送,达到一致性。
在这里插入图片描述
在这里插入图片描述

2、本地消息表优缺点

本地消息表实现了分布式事务的最终一致性,优缺点比较明显。
优点
1.实现逻辑简单,开发成本比较低
缺点
1.与业务场景绑定,高耦合,不可公用
2.本地消息表与业务数据表在同一个库,占用业务系统资源,量大可能会影响数据库性能

3、本地启动rocketmq

首先本地启动rocketmq服务。

1:在D:\rocketmq-all-5.3.1-bin-release\bin目录下,执行cmd命令
启动服务
start mqnamesrv.cmd
启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
关闭服务
mqshutdown namesrv
关闭broker
mqshutdown broker
2:启动rocketmq-dashboard项目,输入127.0.0.1:8080进入可视化界面

其中rocketmq-dashboard可视化代码参考windows下安装启动rocketmq可视化界面
将rocketmq-dashboard导入到idea中,在idea编译启动。
在这里插入图片描述

4、代码实现及验证

4.1、核心代码

(1)注解SecureInvoke

/**
 * 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。
 *
 * @author hauhua
 * @DATE 2025/1/21
 */
@Retention(RetentionPolicy.RUNTIME)//运行时生效
@Target(ElementType.METHOD)//作用在方法上
public @interface SecureInvoke {

    /**
     * 默认3次
     *
     * @return 最大重试次数(包括第一次正常执行)
     */
    int maxRetryTimes() default 3;

    /**
     * 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。
     * 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。
     *
     * @return 是否异步执行
     */
    boolean async() default true;
}

(2)自定义切面SecureInvokeAspect

/**
 * 安全执行切面
 *
 * @author hauhua
 * @DATE 2025/1/21
 */
@Slf4j
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行
@Component
public class SecureInvokeAspect {
    @Autowired
    private SecureInvokeService secureInvokeService;

    @Around("@annotation(secureInvoke)")
    public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable {
        boolean async = secureInvoke.async();
        boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
        // 非事务状态,直接执行,不做任何保证。
        if (!inTransaction) {
            return joinPoint.proceed();
        }
        // 如果是事务状态,保证保存到本地消息表中的数据和保存在数据库中数据,在同一个事务内
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        List<String> parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList());
        SecureInvokeVo dto = SecureInvokeVo.builder()
                .args(JsonUtils.toStr(joinPoint.getArgs()))
                .className(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameterTypes(JsonUtils.toStr(parameters))
                .build();
        SecureInvokeRecord record = SecureInvokeRecord.builder()
                .secureInvokeVo(dto)
                .maxRetryTimes(secureInvoke.maxRetryTimes())
                .nextRetryTime(DateUtil.offsetMinute(new Date(), (int) SecureInvokeService.RETRY_INTERVAL_MINUTES))
                .createTime(new Date())
                .updateTime(new Date())
                .build();
        secureInvokeService.invoke(record, async);
        return null;
    }
}

(3)控制层MessageController

/**
 * 消息管理
 *
 * @author hauhua
 * @DATE 2025/1/21
 **/
@RestController
public class MessageController {

    @Autowired
    private MQProducerService mqProducerService;

    @RequestMapping("sendMsg")
    @Transactional
    public void sendMsg(@RequestParam String topic, @RequestBody Object body) {
        mqProducerService.sendSecureMsg(topic, body,"test");
    }
}

(4)消息Service层MQProducerService

/**
 * 发送mq工具类
 *
 * @author hauhua
 * @DATE 2025/1/21
 */
@Slf4j
public class MQProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMsg(String topic, Object body) {
        Message<Object> build = MessageBuilder.withPayload(body).build();
        rocketMQTemplate.send(topic, build);
    }

    /**
     * 发送可靠消息,在事务提交后保证发送成功
     *
     * @param topic
     * @param body
     */
    @SecureInvoke
    public void sendSecureMsg(String topic, Object body, Object key) {
        // 通过切面实现本地消息表,因为要在此方法执行前将数据保存到本地消息表,进行功能增强
        // 模拟发送mq消息有1/3概率失败
        if(RandomUtil.randomInt(3) >= 1){
            log.info("sendSecureMsg fail");
            throw new IllegalStateException();
        }
        Message<Object> build = MessageBuilder
                .withPayload(body)
                .setHeader("KEYS", key)
                .build();
        log.info("sendSecureMsg start");
        rocketMQTemplate.send(topic, build);
        log.info("sendSecureMsg end");
    }
}

(5)安全执行处理器SecureInvokeService

@Slf4j
@AllArgsConstructor
public class SecureInvokeService {

    public static final double RETRY_INTERVAL_MINUTES = 2D;

    private final SecureInvokeRecordDao secureInvokeRecordDao;

    private final Executor executor;

    // 每5秒执行一次定时任务,从数据库中获取所有等待重试的记录,并异步调用这些记录对应的方法
    @Scheduled(cron = "*/5 * * * * ?")
    public void retry() {
        List<SecureInvokeRecord> secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords();
        if (secureInvokeRecords.isEmpty()) {
            log.info("SecureInvokeService retry no record");
            return;
        }

        for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) {
            doAsyncInvoke(secureInvokeRecord);
        }
    }

    public void save(SecureInvokeRecord record) {
        secureInvokeRecordDao.save(record);
    }

    private void retryRecord(SecureInvokeRecord record, String errorMsg) {
        Integer retryTimes = record.getRetryTimes() + 1;
        SecureInvokeRecord update = new SecureInvokeRecord();
        update.setId(record.getId());
        update.setFailReason(errorMsg);
        update.setNextRetryTime(getNextRetryTime(retryTimes));
        update.setUpdateTime(new Date());
        if (retryTimes > record.getMaxRetryTimes()) {
            update.setStatus(SecureInvokeRecord.STATUS_FAIL);
        } else {
            update.setRetryTimes(retryTimes);
        }
        secureInvokeRecordDao.updateById(update);
    }

    private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法
        double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16m
        return DateUtil.offsetMinute(new Date(), (int) waitMinutes);
    }

    private void removeRecord(Long id) {
        secureInvokeRecordDao.removeById(id);
    }

    public void invoke(SecureInvokeRecord record, boolean async) {
        boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();
        //非事务状态,直接执行,不做任何保证。
        if (!inTransaction) {
            return;
        }
        //保存执行数据
        record.setStatus(SecureInvokeRecord.STATUS_WAIT);
        save(record);
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @SneakyThrows
            @Override
            public void afterCommit() {
                //事务后执行
                if (async) {
                    doAsyncInvoke(record);
                } else {
                    doInvoke(record);
                }
            }
        });
    }

    public void doAsyncInvoke(SecureInvokeRecord record) {
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName());
            doInvoke(record);
        });
    }

    public void doInvoke(SecureInvokeRecord record) {
        SecureInvokeVo secureInvokeVo = record.getSecureInvokeVo();
        try {
            // 构造method以及对应的类和method中的args参数
            SecureInvokeHolder.setInvoking();
            Class<?> beanClass = Class.forName(secureInvokeVo.getClassName());
            Object bean = SpringUtil.getBean(beanClass);
            List<String> parameterStrings = JsonUtils.toList(secureInvokeVo.getParameterTypes(), String.class);
            List<Class<?>> parameterClasses = getParameters(parameterStrings);
            Method method = ReflectUtil.getMethod(beanClass, secureInvokeVo.getMethodName(), parameterClasses.toArray(new Class[]{}));
            Object[] args = getArgs(secureInvokeVo, parameterClasses);
            // 执行方法,回调自己在事务提交后的逻辑
            method.invoke(bean, args);
            // 执行成功则删除secure_invoke_record对应的数据,说明消息已经发送成功
            removeRecord(record.getId());
        } catch (Throwable e) {
            log.error("SecureInvokeService invoke fail", e);
            // 执行失败,等待下次执行(用于重试,下一次执行时间会有对应的算法)
            retryRecord(record, e.getMessage());
        } finally {
            SecureInvokeHolder.invoked();
        }
    }

    @NotNull
    private Object[] getArgs(SecureInvokeVo secureInvokeVo, List<Class<?>> parameterClasses) {
        JsonNode jsonNode = JsonUtils.toJsonNode(secureInvokeVo.getArgs());
        Object[] args = new Object[jsonNode.size()];
        for (int i = 0; i < jsonNode.size(); i++) {
            Class<?> aClass = parameterClasses.get(i);
            args[i] = JsonUtils.nodeToValue(jsonNode.get(i), aClass);
        }
        return args;
    }

    @NotNull
    private List<Class<?>> getParameters(List<String> parameterStrings) {
        return parameterStrings.stream().map(name -> {
            try {
                return Class.forName(name);
            } catch (ClassNotFoundException e) {
                log.error("SecureInvokeService class not fund", e);
            }
            return null;
        }).collect(Collectors.toList());
    }
}

4.2、代码执行流程

请添加图片描述
由于在代码中写死了模拟1/3概率发送消息失败,所以刚开始会将数据插入secure_invoke_record,然后通过TransactionSynchronizationManager执行后置处理,最后通过定时任务,扫描失败的任务,从secure_invoke_record取出数据利用MQ再次重试,并且删除secure_invoke_record表刚刚重试的数据,最后执行完成,达到消息最终一致性。
(1)数据库层面执行验证效果
刚开始1/3概率发送消息失败,通过切面将消息保存到本地消息表secure_invoke_record中。
请添加图片描述
可以看出创建时间为00:25,下一次重试时间为00:27,到了00:27定时任务启动,执行重试,成功后把这个本地消息删除,并重发MQ
请添加图片描述

自定义的topic也已经经过重试发送消息成功
在这里插入图片描述
(2)本地idea控制台日志层面分析
在这里插入图片描述
在这里插入图片描述

在时间为00:25发送失败,写入本地消息表。在重试时间为00:27,发送MQ消息成功。

4.3、项目结构

在这里插入图片描述
主要提交代码如下
在这里插入图片描述
提交MR地址

4.4、项目源码

项目源码demo-springboot-mybatisplus,欢迎大家star!


http://www.kler.cn/a/518664.html

相关文章:

  • vim的多文件操作
  • C#,入门教程(05)——Visual Studio 2022源程序(源代码)自动排版的功能动画图示
  • 用Python和PyQt5打造一个股票涨幅统计工具
  • 数据结构:二叉树—面试题(二)
  • 讯飞绘镜(ai生成视频)技术浅析(二):大模型
  • 【Vim Masterclass 笔记24】S10L43 + L44:同步练习10 —— 基于 Vim 缓冲区的各类基础操作练习(含点评课)
  • 计算机工程:解锁未来科技之门!
  • Spring Boot Actuator 集成 Micrometer(官网文档解读)
  • 警企联动齐发力、共筑反诈“防护墙”
  • 2025美赛B题-问题B:管理可持续旅游
  • 如何使用 MySQL 的 EXPLAIN 语句进行查询分析?
  • 数据结构测试题2
  • day1代码练习
  • 在 DevOps 实践中,如何构建自动化的持续集成和持续交付(CI/CD)管道,以提高开发和测试效率?
  • 浅谈Redis
  • c语言分支和循环
  • ros动态调参界面的修改
  • Linux内核中IPoIB驱动模块的初始化与实现
  • 什么是COLLATE排序规则?
  • WPF基础 | WPF 基础概念全解析:布局、控件与事件
  • 2025-01-22 Unity Editor 1 —— MenuItem 入门
  • 2025美赛数学建模MCM/ICM选题建议与分析,思路+模型+代码
  • 寒假1.23
  • springboot图书馆管理系统前后端分离版本
  • 程序员转型测试:解锁漏洞挖掘新旅程
  • Ubuntu终端CTRL+S被锁定后解锁快捷键