当前位置: 首页 > article >正文

轻量级TCC框架的实现

现有seata、tcc-transaction等tcc框架实现都较为重量级,今天主要带来一种轻量级的实现,大概只用了1200行代码实现。不依赖具体框架grpc、http、dubbo等,只需要业务系统按照标准化实现Try、Commit、Cancel实现接口即可。

已解决悬挂、幂等、空回滚、事务嵌套问题,业务层面无需关注这部分处理。

TCC分为以下几个阶段:

  1. 执行前置动作 (业务资源的初始化,例如: 创建一个初始化的订单)
  2. Try (调用外部服务,进行资源的预留)
  3. 执行本地事务 (需要保证业务是在一个事务内完成)
  4. Commit\Cancel (根据本地事务的执行的成功与否,进行commit || cancel)

示例

该示例主要用于用户下单的同时,需要扣减用户积分的场景,订单服务和积分服务分别是独立服务部署,它们之间存在分布式事务的问题, 我们通过当前框架展示是如何解决以上问题的。

tcc/src/test/java/com/damon/sample at master · 654894017/tcc · GitHub

步骤1.初始化订单服务数据库表

-- 创建事务表
CREATE TABLE `tcc_main_log_order` (
  `biz_id` bigint NOT NULL COMMENT '业务id',
  `status` int NOT NULL DEFAULT '0' COMMENT '状态: 1 创建事务成功 2  回滚成功  3 完成本地事务成功  4 提交事务成功',
  `version` int NOT NULL DEFAULT '0' COMMENT '版本号',
  `last_update_time` bigint NOT NULL DEFAULT '0' COMMENT '最后更新时间',
  `create_time` bigint NOT NULL DEFAULT '0' COMMENT '创建时间',
  `checked_times` int NOT NULL DEFAULT '0' COMMENT '失败检查次数',
  PRIMARY KEY (`biz_id`),
  KEY `idx_status_checked_times_create_time` (`status`,`checked_times`,`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='主事务日志表';

-- 创建订单表
CREATE TABLE `tcc_demo_order` (
  `order_id` bigint NOT NULL,
  `status` int NOT NULL,
  `user_id` bigint NOT NULL,
  `deduction_points` bigint DEFAULT NULL,
  PRIMARY KEY (`order_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

步骤2.积分服务创建子事务表

-- 创建子事务表
CREATE TABLE `tcc_sub_log_order` (
  `biz_id` bigint NOT NULL COMMENT '业务id',
  `sub_biz_id` bigint NOT NULL DEFAULT '0' COMMENT '子业务id',
  `status` int NOT NULL DEFAULT '0' COMMENT '状态: 1 创建事务成功 2  提交事务成功  3 回滚事务成功',
  `version` int NOT NULL DEFAULT '0' COMMENT '版本号',
  `last_update_time` bigint NOT NULL DEFAULT '0' COMMENT '最后更新时间',
  `create_time` bigint NOT NULL DEFAULT '0' COMMENT '创建时间',
  PRIMARY KEY (`biz_id`,`sub_biz_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='子事务日志表';

-- 创建积分变动日志表
CREATE TABLE `tcc_demo_points_changing_log` (
  `biz_id` bigint NOT NULL,
  `user_id` bigint NOT NULL,
  `change_points` bigint NOT NULL,
  `change_type` int NOT NULL,
  `status` int NOT NULL,
  PRIMARY KEY (`biz_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- 创建用户积分表
CREATE TABLE `tcc_demo_user_points` (
  `user_id` bigint NOT NULL,
  `points` bigint NOT NULL,
  PRIMARY KEY (`user_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- 初始化用户积分
INSERT INTO `tcc_demo_user_points` (`user_id`, `points`) VALUES (12345678, 999999999989989999);

注意事项

事务表都是以tcc_main_log_xxxx 命名,子事务表都是以tcc_sub_log_xxxx命名,xxxx为业务分类,例如订单下单的业务,事务表命名为tcc_main_log_order, 子事务表命名为tcc_sub_log_order.

步骤3.运行 com.damon.sample.points.PointsApplication

步骤4.运行 com.damon.sample.order.TestRun

下单服务

下单服务继承TccMainService服务

package com.damon.sample.order.app;

import cn.hutool.core.util.IdUtil;
import com.damon.sample.order.client.IOrderSubmitAppService;
import com.damon.sample.order.domain.IPointsGateway;
import com.damon.sample.order.domain.Order;
import com.damon.tcc.TccMainService;
import com.damon.tcc.config.TccMainConfig;
import com.damon.tcc.exception.TccLocalTransactionException;
import com.damon.tcc.exception.TccPrepareException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class OrderSubmitAppService extends TccMainService<Long, Map<String, Boolean>, Order> implements IOrderSubmitAppService {
    private final JdbcTemplate jdbcTemplate;
    private final IPointsGateway pointsGateway;

    @Autowired
    public OrderSubmitAppService(TccMainConfig config, IPointsGateway pointsGateway) {
        super(config);
        this.jdbcTemplate = new JdbcTemplate(config.getDataSource());
        this.pointsGateway = pointsGateway;
    }

    /**
     * 检查失败的日志,用于纠正事务是否需要回顾还是提交
     */
    public void executeFailedLogCheck() {
        super.executeFailedLogCheck();
    }

    /**
     * 检查死亡的日志,用于纠正事务是否需要回顾还是提交
     */
    public void executeDeadLogCheck() {
        super.executeDeadLogCheck();
    }

    /**
     * 执行失败日志检查的时候需要回查请求参数(因为事务日志未记录方法请求参数,所以需要回查一下)
     *
     * @param bizId 实体对象id(业务id)
     * @return
     */
    @Override
    protected Order callbackParameter(Long bizId) {
        return jdbcTemplate.queryForObject("select * from tcc_demo_order where order_id = ? ", new BeanPropertyRowMapper<>(Order.class), bizId);
    }

    /**
     * 创建订单 (1 预先创建订单  2 执行try动作)
     *
     * @param userId
     * @param points
     * @return
     */
    @Override
    public Long submitOrder(Long userId, Long points) {
        Long orderId = IdUtil.getSnowflakeNextId();
        // 预创建订单
        jdbcTemplate.update("insert into tcc_demo_order(order_id, user_id, status, deduction_points) values (?, ?, ? ,? )",
                orderId, userId, 0, points);
        Order order = new Order(orderId, 0, userId, points);
        try {
            return super.process(order);
        } catch (TccPrepareException e) {
            throw e;
        } catch (TccLocalTransactionException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException("系统异常");
        }
    }

    /**
     * try执行用户积分扣除
     *
     * @param order
     * @return
     */
    @Override
    protected Map<String, Boolean> prepare(Order order) {
        Boolean result = pointsGateway.tryDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints());
        Map<String, Boolean> map = new HashMap<>();
        map.put("flag", result);
        return map;
    }

    @Override
    protected Long executeLocalTransaction(Order object, Map<String, Boolean> map) {
        int result = jdbcTemplate.update("update tcc_demo_order set status = ?  where order_id = ? ", 1, object.getOrderId());
        if (result == 0) {
            throw new RuntimeException("无效的订单id : " + object.getOrderId());
        }
        return object.getOrderId();
    }

    /**
     * commit积分
     *
     * @param order
     */
    @Override
    protected void commit(Order order) {
        pointsGateway.commitDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints());
    }

    /**
     * cancel回滚积分
     *
     * @param order
     */
    @Override
    protected void cancel(Order order) {
        pointsGateway.cancelDeductionPoints(order.getOrderId(), order.getUserId(), order.getDeductionPoints());
    }
}

积分服务

积分服务继承TccSubService服务

package com.damon.sample.points.app;

import com.damon.sample.points.client.IPointsDeductionAppService;
import com.damon.sample.points.client.PointsDeductCmd;
import com.damon.tcc.config.TccSubConfig;
import com.damon.tcc.TccSubService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Service
public class PointsDeductionAppService extends TccSubService<Boolean, PointsDeductCmd> implements IPointsDeductionAppService {
    private final Logger log = LoggerFactory.getLogger(PointsDeductionAppService.class);
    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public PointsDeductionAppService(TccSubConfig config) {
        super(config);
        this.jdbcTemplate = new JdbcTemplate(config.getDataSource());
    }

    /**
     * try执行积分扣减
     * @param parameter
     * @return
     */
    @Override
    public boolean attempt(PointsDeductCmd parameter) {
        return super.prepare(parameter, cmd -> {
            int result = jdbcTemplate.update("update tcc_demo_user_points set points = points - ? where user_id = ? and points - ? >= 0",
                    cmd.getDeductionPoints(), cmd.getUserId(), cmd.getDeductionPoints());
            boolean transactionActive = TransactionSynchronizationManager.isActualTransactionActive();

            if (result == 0) {
                throw new RuntimeException("用户积分不足 || 用户不存在");
            }

            int result2 = jdbcTemplate.update("insert tcc_demo_points_changing_Log (user_id, change_points, change_type, biz_id, status) values(?,?,?,?,?)",
                    cmd.getUserId(), cmd.getDeductionPoints(), 1, cmd.getOrderId(), 0);

            return true;
        });
    }

    /**
     * commit提交积分扣减
     * @param parameter
     */
    @Override
    public void commit(PointsDeductCmd parameter) {
        super.commit(parameter, cmd -> {
            int result = jdbcTemplate.update("update tcc_demo_points_changing_Log set status = 1 where biz_id = ?", cmd.getBizId());
            if (result == 0) {
                throw new RuntimeException("无效的业务id,无法积分commit");
            }
        });
    }

    /**
     * cancel回滚积分扣减
     * @param parameter
     */
    @Override
    public void cancel(PointsDeductCmd parameter) {
        super.cancel(parameter, cmd -> {
            int result = jdbcTemplate.update("update tcc_demo_points_changing_Log set status = 2 where biz_id = ?", cmd.getBizId());
            if (result == 0) {
                log.error("无效的业务id : {},无法进行积分cancel", cmd.getBizId());
                return;
            }

            int result2 = jdbcTemplate.update("update tcc_demo_user_points set points = points + ? where user_id = ?",
                    cmd.getDeductionPoints(), cmd.getUserId()
            );
            if (result2 == 0) {
                throw new RuntimeException("无效的用户id,无法进行积分rollback");
            }
        });
    }


}

FAQ

1.关于幂等、悬挂、空回滚如何解决?

主要基于执行先判断的思路来实现的,在子系统执行Cancle的时候,都会先判断tcc_sub_log_xxxx这个表的事务事务已经完成或者取消,如果已完成直接不执行业务,如果未执行则执行Cancel业务,同时更新tcc_sub_log_xxxx的日志状态为已取消。最极端情况举一个示例,假如服务提供者try和cancel同时在执行(try因为网络问题非常滞后的到达业务服务器,这时主服务因为等待超时,调用了子服务cancel动作)。主要分两种情况:

1.假如先执行了cancel,在执行Try则不会执行,因为子事务已经取消(tcc_sub_log_xxxx会增加一条取消日志),在执行try操作时会出现索引冲突异常(tcc_sub_log_xxxx表有biz_id + sub_biz_id唯一索引),子事务的try会回滚。

2.假如先执行了try,在执行cancel则会执行正常取消,属于正常情况。还一种情况就是同时执行了Try、cancel操作,这时候只能依赖tcc_sub_log_xxxx表有biz_id + sub_biz_id唯一索引来解决更新冲突问题。假如Try先执行,cacel就会报错,上游服务重新发起cancel即可。假如先执行了cancle,则try会报错(唯一索引冲突),这时不用处理,子事务的try会回滚。

2.tcc_sub_log_xxxx表事务需要和本地业务在一个数据库事务?

是的,幂等、悬挂、空回滚问题都是基于tcc_sub_log_xxxx的事务日志表进行的,需要保证业务事务和tcc_sub_log_xxxx表的事务在一个数据库事务下。

3.上游系统重放try、commit、cancel怎么处理?

调用方误触发,存在以下几种可能

1.已经commit的事务,调用了cancle,已增加事务是否已commit判断,已commit的事务调用cancel不会执行,同时需要基于tcc_sub_log_xxxx表的version实现的乐观锁来解决更新冲突的问题。

2.已cancel的事务,调用了commit,已增加事务是否已cancel判断,已cancel的事务调用coommit不会执行,同时需要基于tcc_sub_log_xxxx表的version实现的乐观锁来解决更新冲突的问题。

3.已cancel的事务,调用了try,依赖tcc_sub_log_xxxx表biz_id + sub_biz_id唯一索引来解决更新冲突问题。

4.已commit的事务,调用了try,依赖tcc_sub_log_xxxx表biz_id + sub_biz_id唯一索引来解决更新冲突问题。

5.重复try依赖tcc_sub_log_xxxx表biz_id + sub_biz_id唯一索引来解决更新冲突问题。

6.重复commit,已增加事务是否已commit判断,已commit的事务调用commit不会执行,同时需要基于tcc_sub_log_xxxx表的version实现的乐观锁来解决更新冲突的问题。

7.重复cancel,已增加事务是否已cancel判断,已cancel的事务调用cancel不会执行,同时需要基于tcc_sub_log_xxxx表的version实现的乐观锁来解决更新冲突的问题。

GitHub - 654894017/tcc: 编程式tcc框架,已解决悬挂、幂等、空回滚、事务嵌套问题。


http://www.kler.cn/a/578717.html

相关文章:

  • ZerotTier -- 开源、不限流、实现远程连接的内网穿透工具(window环境)
  • std::vector的模拟实现
  • Python----数据可视化(Seaborn二:绘图一)
  • vue管理系统常规布局思路,头部+菜单+主题(Container 布局容器)
  • 【编译器】VSCODE搭建ESP32-C3
  • C++【类和对象】
  • 第四届大数据、区块链与经济管理国际学术会议
  • Spring使用@Scheduled注解的参数详解
  • 基于ANTLR4的大数据SQL编辑器解析引擎实践|得物技术
  • Redis- 切片集群
  • LEETCODE:二叉树的层序遍历JAVA
  • android viewmodel如何使用
  • 用OpenCV写个视频播放器可还行?(C++版)
  • 靶场(四)---小白心得全流程分析
  • AIP-162 资源修订
  • Docker(认识且会基础操作)
  • yolov5训练自己数据集的全流程+踩过的坑
  • Linux 入门:常用命令速查手册
  • 【 <一> 炼丹初探:JavaWeb 的起源与基础】之 JSP 标签库:自定义标签的开发与应用
  • 2025开源SCA工具推荐 | 组件依赖包安全风险检测利器