当前位置: 首页 > article >正文

SpringCloud整合seata,XA、AT、TCC、SAGA模式

参考资料:

SpringCloud-Alibaba搭建

SpringCloud-nacos整合

Seata部署

参考demo(及学习资料)

seata官网

参考视频​​​​​c(AT模式的UNDO_LOG讲的可能有点问题,但是很通俗易懂)

 参考视频2(不太通俗易懂)

沽泡付费视频(就是对着官网念) 

上述三个视频的参考资料


准备环境:

        该教程默认已经有如下环境,如果没有可以参考上述教程:

  • 部署了Nacos
  • 部署了Seata
  • 搭建了SpringCloud 
  • 并且版本已经做到了统一(只要保证JVM里面使用的jar包版本和部署的版本一致即可
  • 同一系列版本见Wiki

 


模拟事件-搭建demo:

        因为seata是分布式事务,所以这个例子需要多个微服务,多个库进行联动

创建数据库

        首先创建两个数据库

CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata_db1` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */;

USE `seata_db1`;

/*Table structure for table `money` */

DROP TABLE IF EXISTS `money`;

CREATE TABLE `money` (
  `id` varchar(32) NOT NULL COMMENT '主键',
  `user_name` varchar(32) DEFAULT NULL COMMENT '名字',
  `bank_money` double DEFAULT NULL COMMENT '银行余额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `money` */

insert  into `money`(`id`,`user_name`,`bank_money`) values 
('084d862fa66c4c82886a4b0bb9214ab1','张三',100);

 

CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata_db2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */;

USE `seata_db2`;

/*Table structure for table `order_details` */

DROP TABLE IF EXISTS `order_details`;

CREATE TABLE `order_details` (
  `id` varchar(32) NOT NULL COMMENT '主键',
  `goods_name` varchar(32) DEFAULT NULL COMMENT '商品名',
  `count` int(11) DEFAULT NULL COMMENT '仓库余额',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `order_details` */

insert  into `order_details`(`id`,`goods_name`,`count`) values 
('7072809e86834335843bc918c33074ec','desk',50);

 

 创建示例微服务

         用搭建好的SpringClud框架,默认已经整合了nacos,并且已经部署好了seata

        关于微服务的创建,这里就不再赘述了,可以参考该文章

          参考上述文章,创建两个微服务   seata-demo1seata-demo2,然后进行相关依赖的导入

        这里是将所有的依赖加入到了公共服务里面,请根据个人情况使用,在common-service服务中的pom.xml文件,

Lombok依赖

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

 控制层依赖

        <!-- springboot starter web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

 mybatis-plus及数据库相关

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.1.tmp</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!--引入mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>

fegin组件

        <!--fegin组件-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

 nacos相关(使用时默认的,如果nacos不是上述系列的则需要指定版本)

        <!--引入nacos client的依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <!--引入nacos config 依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>

还有一些bootstrap.yml启动的辅助依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
        </dependency>

最后添加的依赖如下:

然后将公共依赖引入创建的两个微服务,参考

        <dependency>
            <groupId>org.example</groupId>
            <artifactId>common-service</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

将相关配置交给nacos管理,可以参考文章,这里只是贴代码,不赘述

  • seata-demo1
bootstrap.yml
spring:
  profiles:
    include:
      uat

 bootstrap-uat.yml

spring:
  config:
    use-legacy-processing: true
  profiles.active: uat
  application:
    name: seata-demo1
  cloud:
    nacos:
      config:
        server-addr: localhost:8848
        group: DEFAULT_GROUP
        username: nacos
        password: nacos
        file-extension: yaml
        refresh-enabled: true

然后将相关配置放入到nacos中

server:
  port: 8081
spring:
    nacos:
        discovery:
            server-addr: localhost:8848
            username: nacos
            password: nacos
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/seata_db1?characterEncoding=UTF-8
        username: root
        password: 123888

  •  seata-demo2
bootstrap.yml
spring:
  profiles:
    include:
      uat

bootstrap-uat.yml
spring:
  config:
    use-legacy-processing: true
  profiles.active: uat
  application:
    name: seata-demo2
  cloud:
    nacos:
      config:
        server-addr: localhost:8848
        group: DEFAULT_GROUP
        username: nacos
        password: nacos
        file-extension: yaml
        refresh-enabled: true

 然后将相关配置放入到nacos中

server:
  port: 8080
spring:
    nacos:
        discovery:
            server-addr: localhost:8848
            username: nacos
            password: nacos
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/seata_db2?characterEncoding=UTF-8
        username: root
        password: 123888

这样的话,就可以连接到数据库了,然后编写 mybatis-plus 的三层,参考文章,这里将不再赘述,仅仅是贴代码

  • seata-demo1
package org.example.entity;

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("money")
public class Money {
    @TableId
    private String id;
    @TableField("user_name")
    private String userName;
    @TableField("bank_money")
    private Double bankMoney;
}
package org.example.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.Money;

public interface MoneyMapper extends  BaseMapper<Money> {
}
package org.example.service;

import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Money;

public interface MoneyService extends IService<Money> {
    
}
package org.example.service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.example.dao.MoneyMapper;
import org.example.entity.Money;
import org.springframework.transaction.annotation.Transactional;

@Service
public class MoneyServiceImpl extends ServiceImpl<MoneyMapper, Money> implements MoneyService{

}

  • seata-demo2
@Data
@TableName("order_details")
public class OrderDetails {
    @TableId
    private String id;
    @TableField("goods_name")
    private String goodsName;
    @TableField("count")
    private int count;
}
package org.example.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.OrderDetails;

public interface OrderDetailsMapper extends BaseMapper<OrderDetails> {
}
package org.example.service;

import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.OrderDetails;

public interface OrderDetailsService extends IService<OrderDetails> {


}

 然后编写业务代码,分布式事务的调用,模拟扣款和扣库存

  • seata-demo1

feign调用

package org.example.client;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(value = "seata-demo2-service")
public interface Demo2Client {
    @RequestMapping("order/addCount")
    ResponseEntity<String> addCount(@RequestParam("count") int count);
}
MoneyService
package org.example.service;

import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Money;

public interface MoneyService extends IService<Money> {
    void changeMoney(Money money,Double num);
}
MoneyServiceImpl
package org.example.service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.example.dao.MoneyMapper;
import org.example.entity.Money;

@Service
public class MoneyServiceImpl extends ServiceImpl<MoneyMapper, Money> implements MoneyService{

    public void changeMoney(Money money, Double num) {
        Double bankMoney = money.getBankMoney();
        if (bankMoney >= num) {
            money.setBankMoney(bankMoney - num);
            this.updateById(money);
        }else{
            throw new RuntimeException("余额不足");
        }
    }
}
TransService
package org.example.service;

public interface TransService {
    void globalRollBackDemo(int orderCount);
}
TransServiceImpl
package org.example.service;

import lombok.extern.slf4j.Slf4j;
import org.example.client.Demo2Client;
import org.example.entity.Money;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class TransServiceImpl implements TransService {
    @Autowired
    private MoneyService moneyService;

    @Autowired
    private Demo2Client demo2Client;

    /**
     * 分布式事务   回滚例子
     */
    public void globalRollBackDemo(int orderCount) {
        log.info("=================开始扣钱========================");
        List<Money> list = moneyService.list();
        Money money = list.get(0);
        Double bankMoney = money.getBankMoney();
        log.info("=================查询到银行余额为:"+bankMoney+",开始扣款:20=================");
        moneyService.changeMoney(money,20D);
        log.info("=================扣款成功,开始扣库存,扣减库存为:"+orderCount+"========================");
        demo2Client.addCount(orderCount);
    }
}
MoneyController
package org.example.controller;

import org.example.client.Demo2Client;
import org.example.service.TransService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@RequestMapping("money")
public class MoneyController {

    @Autowired
    private TransService transService;

    @RequestMapping("test/globalTransactional/fail")
    public ResponseEntity<String> testGTRollBack() {
        transService.globalRollBackDemo(60);
        return new ResponseEntity<String>("sucess",HttpStatus.OK);
    }

    @RequestMapping("test/globalTransactional/sucess")
    public ResponseEntity<String> testGTSucess() {
        transService.globalRollBackDemo(40);
        return new ResponseEntity<String>("sucess",HttpStatus.OK);
    }


}

SeataDemo1Application
package org.example;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;


@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo1Application
{
    public static void main( String[] args )
    {
        SpringApplication.run(SeataDemo1Application.class,args);
    }
}

  • seata-demo2
OrderDetailsController
package org.example.controller;

import org.example.entity.OrderDetails;
import org.example.service.OrderDetailsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;


@Controller
@RequestMapping("order")
public class OrderDetailsController {
    @Autowired
    private OrderDetailsService orderService;

    @RequestMapping("addCount")
    public ResponseEntity<String> addCount(@RequestParam("count") Integer count) {
        orderService.updateOrderDetails(count);
        return new ResponseEntity<String>("success",HttpStatus.OK);
    }


}
OrderDetailsService
package org.example.service;

import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.OrderDetails;

public interface OrderDetailsService extends IService<OrderDetails> {

    //  修改库存剩余
    void updateOrderDetails(Integer count);
    
}
OrderDetailsServiceImpl
package org.example.service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.example.dao.OrderDetailsMapper;
import org.example.entity.OrderDetails;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class OrderDetailsServiceImpl extends ServiceImpl<OrderDetailsMapper, OrderDetails> implements OrderDetailsService {

    @Override
    public void updateOrderDetails(Integer count) {
        List<OrderDetails> list = this.list();
        // 取第一个进行
        OrderDetails orderDetails = list.get(0);
        if (orderDetails.getCount()>=count) {
            int sum = orderDetails.getCount() - count;
            orderDetails.setCount(sum);
            this.updateById(orderDetails);
            log.info("库存扣减成功");
        }else{
            throw new RuntimeException("库存为:"+list.get(0).getCount()+"不足,开始回滚");
        }
    }
}
SeataDemo2Application
package org.example;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;


@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo2Application
{
    public static void main( String[] args )
    {
        SpringApplication.run(SeataDemo2Application.class,args);
    }
}


XA模式:

导入依赖

  •  可以在公共的服务,或者私有的服务中导入seata的jar包
  • jar包的版本如果和Spring-Cloud是同一个系列的(见版本选择),可以不指定,如果不是一个系列的,则需要指定版本(和部署的版本一致
  • 本教程是将jar包放入到公共服务中,并且指定版本号

        <!-- 注意一定要引入对版本,要引入spring-cloud版本seata,而不是springboot版本的seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <!-- 排除掉springcloud默认的seata版本,以免版本不一致出现问题-->
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.7.0</version>
        </dependency>

添加seata的相关配置

 在nacos管理的配置中,找到seata-demo1和seata-demo2的配置,添加如下配置:

#seata客户端配置
seata:
  enabled: true
  application-id: seata_tx
  tx-service-group: seata_tx_group
  service:
    vgroup-mapping:
      seata_tx_group: default
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      group: SEATA_GROUP
  data-source-proxy-mode: XA

开启分布式事务---XA模式

  • 在需要开启分布式事务的地方使用   @GlobalTransactional 修饰即可
  • 本地事务,建议加上  @Transactional
  • 例如我们要对,上面的扣钱和扣库存业务,添加分布式事务,只需要在seata-demo1的
    TransServiceImpl添加@GlobalTransactional

  • 然后在本地事务使用@Transactional修饰
  • 如:seata-demo1中的 MoneyServiceImpl 以及seata-demo2中的 OrderDetailsServiceImpl

这样,分布式事务XA模式就实现了 


AT模式:

        AT模式和XA模式在实现上基本没有区别,只需要将YML文件中的XA改为AT即可


TCC模式:

        关于TCC模式的原理,业务悬挂,空回滚等原理,以及TCC专用的注解这些就不说了,详情可以看上面的视频,这里仅仅是讲怎么实现。

        基于上面搭出的框架,按照下面的步骤来编写TCC模式

        TCC模式是在AT模式上进行改造的,所以YML的写法和AT模式一样

修改YML配置

seata-demo1
server:
  port: 8081
spring:
    nacos:
        discovery:
            server-addr: localhost:8848
            username: nacos
            password: nacos
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/seata_db1?characterEncoding=UTF-8
        username: root
        password: 123888
#seata客户端配置
seata:
  enabled: true
  application-id: seata_tx
  tx-service-group: seata_tx_group
  service:
    vgroup-mapping:
      seata_tx_group: default
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      group: SEATA_GROUP
  data-source-proxy-mode: AT

 seata-demo2
server:
  port: 8080
spring:
    nacos:
        discovery:
            server-addr: localhost:8848
            username: nacos
            password: nacos
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/seata_db2?characterEncoding=UTF-8
        username: root
        password: 123888
#seata客户端配置
seata:
  enabled: true
  application-id: seata_tx
  tx-service-group: seata_tx_group
  service:
    vgroup-mapping:
      seata_tx_group: default
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      group: SEATA_GROUP
  data-source-proxy-mode: AT

创建数据库

         可以在上述seata_db1数据库中创建两个数据库:t_account和t_account_tx

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for t_account
-- ----------------------------
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`  (
  `id` INT NOT NULL AUTO_INCREMENT,
  `user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `money` INT NULL DEFAULT 0,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;

-- ----------------------------
-- Records of t_account
-- ----------------------------
INSERT INTO `t_account` VALUES (1, 'U100000', 900);

-- ----------------------------
-- Table structure for t_account_tx
-- ----------------------------
DROP TABLE IF EXISTS `t_account_tx`;
CREATE TABLE `t_account_tx`  (
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
  `tx_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '事务id',
  `freeze_money` INT NULL DEFAULT NULL COMMENT '冻结金额',
  `state` INT NULL DEFAULT NULL COMMENT '状态 0try 1confirm 2cancel',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;

        可以在上述seata_db2数据库中创建两个数据库:t_order和t_order_tx

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order`  (
  `id` INT NOT NULL AUTO_INCREMENT,
  `user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `commodity_code` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `count` INT NULL DEFAULT 0,
  `money` INT NULL DEFAULT 0,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 25 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;

-- ----------------------------
-- Records of t_order
-- ----------------------------

-- ----------------------------
-- Table structure for t_order_tx
-- ----------------------------
DROP TABLE IF EXISTS `t_order_tx`;
CREATE TABLE `t_order_tx`  (
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
  `tx_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '事务id',
  `state` INT NULL DEFAULT NULL COMMENT '状态 0try 1confirm 2cancel',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;

编写Mybatis-plus的三层及业务层

 seata-demo1

实体
package org.example.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_account")
public class Account {
    @TableId(type = IdType.AUTO)
    private Integer id;
    private String userId;
    private int money;
}

package org.example.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_account_tx")
public class AccountTX {

    public static final int STATE_TRY = 0;
    public static final int STATE_CONFIRM = 1;
    public static final int STATE_CANCEL = 2;

    @TableId(type = IdType.AUTO)
    private Integer id;
    private String txId;
    private int freezeMoney;
    private int state = STATE_TRY;
}
dao层
package org.example.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.Account;

public interface AccountMapper  extends BaseMapper<Account> {
}

package org.example.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.AccountTX;

public interface AccountTXMapper extends BaseMapper<AccountTX> {
}
service层
interface接口
package org.example.service;

import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Account;

public interface IAccountService  extends IService<Account> {

    /**
     * 账户扣款
     * @param userId
     * @param money
     * @return
     */
    void reduce(String userId, int money);
}
package org.example.service;


import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * TCC 二阶段提交业务接口
 */
@LocalTCC
public interface IAccountTCCService {
    /**
     * try-预扣款
     */
    @TwoPhaseBusinessAction(name="tryReduce", commitMethod = "confirm", rollbackMethod = "cancel")
    void tryReduce(@BusinessActionContextParameter(paramName = "userId") String userId,
                   @BusinessActionContextParameter(paramName = "money") int money);
    /**
     * confirm-提交
     * @param ctx
     * @return
     */
    boolean confirm(BusinessActionContext ctx);
    /**
     * cancel-回滚
     * @param ctx
     * @return
     */
    boolean cancel(BusinessActionContext ctx);
}
实现层
package org.example.service;

import org.example.dao.AccountMapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.example.entity.Account;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {

    @Override
    @Transactional
    public void reduce(String userId, int money) {
        Account one = lambdaQuery().eq(Account::getUserId, userId).one();
        if(one != null && one.getMoney() < money){
            throw new RuntimeException("Not Enough Money ...");
        }
        lambdaUpdate().setSql("money = money - " + money)
                        .eq(Account::getUserId, userId)
                        .update();

    }
}
package org.example.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.example.dao.AccountMapper;
import org.example.dao.AccountTXMapper;
import org.example.entity.Account;
import org.example.entity.AccountTX;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AccountTCCServiceImpl  implements IAccountTCCService {
    @Autowired
    private AccountMapper accountMapper;
    @Autowired
    private AccountTXMapper accountTXMapper;
    @Override
    public void tryReduce(String userId, int money) {
        System.err.println("-----------tryReduce-------------" + RootContext.getXID());
        //业务悬挂
        AccountTX accountTX = accountTXMapper.selectOne(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, RootContext.getXID()));
        if (accountTX != null){
            //存在,说明已经canel执行过类,拒绝服务
            return;
        }
        Account one = accountMapper.selectOne(new LambdaQueryWrapper<Account>().eq(Account::getUserId, userId));
        if(one != null && one.getMoney() < money){
            throw new RuntimeException("Not Enough Money ...");
        }
        LambdaUpdateWrapper<Account> wrapper = new LambdaUpdateWrapper<>();
        wrapper.setSql("money = money - " + money);
        wrapper.eq(Account::getUserId, userId);

        accountMapper.update(null, wrapper);


        AccountTX tx = new AccountTX();
        tx.setFreezeMoney(money);
        tx.setTxId(RootContext.getXID());
        tx.setState(AccountTX.STATE_TRY);

        accountTXMapper.insert(tx);

    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        System.err.println("-----------confirm-------------");
        //删除记录
        int ret = accountTXMapper.delete(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, ctx.getXid()));
        return ret == 1;

    }
    @Override
    public boolean cancel(BusinessActionContext ctx) {
        System.err.println("-----------cancel-------------");
        String userId = ctx.getActionContext("userId").toString();
        String money = ctx.getActionContext("money").toString();

        AccountTX accountTX = accountTXMapper.selectOne(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, ctx.getXid()));
        if (accountTX == null){
            //为空, 空回滚
            accountTX = new AccountTX();
            accountTX.setTxId(ctx.getXid());
            accountTX.setState(AccountTX.STATE_CANCEL);
            if(money != null){
                accountTX.setFreezeMoney(Integer.parseInt(money));
            }
            accountTXMapper.insert(accountTX);
            return true;
        }
        //幂等处理
        if(accountTX.getState() == AccountTX.STATE_CANCEL){
            return true;
        }

        //恢复余额
        accountMapper.update(null, new LambdaUpdateWrapper<Account>()
                        .setSql("money = money + " + money)
                .eq(Account::getUserId, userId));

        accountTX.setFreezeMoney(0);
        accountTX.setState(AccountTX.STATE_CANCEL);
        int ret = accountTXMapper.updateById(accountTX);
        return ret == 1;
    }
}
controller层
package org.example.controller;

import org.example.service.IAccountTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("accounts")
public class AccountController {

    //@Autowired
    //private IAccountService accountService;
    @Autowired
    private IAccountTCCService accountTCCService;

    @GetMapping(value = "/reduce")
    public String reduce(String userId, int money) {
        try {
            accountTCCService.tryReduce(userId, money);
        } catch (Exception exx) {
            exx.printStackTrace();
            return "FAIL";
        }
        return "SUCCESS";
    }

}
启动类
package org.example;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;


@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo1Application
{
    public static void main( String[] args )
    {
        SpringApplication.run(SeataDemo1Application.class,args);
    }
}

seata-demo2
实体层
package org.example.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_order")
public class Order {
    @TableId(type = IdType.AUTO)
    private Integer id;
    private String userId;
    private String commodityCode;
    private Integer count;
    private Integer money;
}

package org.example.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_order_tx")
public class OrderTX {

    public static final int STATE_TRY = 0;
    public static final int STATE_CONFIRM = 1;
    public static final int STATE_CANCEL = 2;

    @TableId(type = IdType.AUTO)
    private Integer id;
    private String txId;
    private int state = STATE_TRY;
}
dao层
package org.example.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.Order;

public interface OrderMapper  extends BaseMapper<Order> {
}

package org.example.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.OrderTX;

public interface OrderTXMapper extends BaseMapper<OrderTX> {
}
service层
interface层
package org.example.service;

import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Order;

public interface IOrderService  extends IService<Order> {

    /**
     * 创建订单
     */
    void create(String userId, String commodityCode, int orderCount);
}
package org.example.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * TCC 二阶段提交业务接口
 */
@LocalTCC
public interface IOrderTCCService {
    /**
     * try-预扣款
     */
    @TwoPhaseBusinessAction(name="tryCreate", commitMethod = "confirm", rollbackMethod = "cancel")
    void tryCreate(@BusinessActionContextParameter(paramName = "userId") String userId,
                   @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
                   @BusinessActionContextParameter(paramName = "orderCount") int orderCount);

    /**
     * confirm-提交
     * @param ctx
     * @return
     */
    boolean confirm(BusinessActionContext ctx);

    /**
     * cancel-回滚
     * @param ctx
     * @return
     */
    boolean cancel(BusinessActionContext ctx);

}
实现层
package org.example.service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.example.entity.Order;
import org.example.feign.AccountFeignClient;
import org.example.dao.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {

    @Autowired
    private AccountFeignClient accountFeignClient;

    @Override
    @Transactional
    public void create(String userId, String commodityCode, int count) {
        // 定单总价 = 订购数量(count) * 商品单价(100)
        int orderMoney = count * 100;
        // 生成订单
        Order order = new Order();
        order.setCount(count);
        order.setCommodityCode(commodityCode);
        order.setUserId(userId);
        order.setMoney(orderMoney);
        super.save(order);

        // 调用账户余额扣减
        String result = accountFeignClient.reduce(userId, orderMoney);
        if (!"SUCCESS".equals(result)) {
            throw new RuntimeException("Failed to call Account Service. ");
        }

    }
}
package org.example.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.example.dao.OrderMapper;
import org.example.dao.OrderTXMapper;
import org.example.entity.Order;
import org.example.entity.OrderTX;
import org.example.feign.AccountFeignClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderTCCServiceImpl  implements IOrderTCCService {

    @Autowired
    private AccountFeignClient accountFeignClient;

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderTXMapper orderTXMapper;

    @Override
    public void tryCreate(String userId, String commodityCode, int count) {
        System.err.println("---------tryCreate-----------");

        //业务悬挂
        OrderTX orderTX = orderTXMapper.selectOne(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, RootContext.getXID()));
        if (orderTX != null){
            //存在,说明已经canel执行过类,拒绝服务
            return;
        }

        // 定单总价 = 订购数量(count) * 商品单价(100)
        int orderMoney = count * 100;
        // 生成订单
        Order order = new Order();
        order.setCount(count);
        order.setCommodityCode(commodityCode);
        order.setUserId(userId);
        order.setMoney(orderMoney);
        orderMapper.insert(order);

        OrderTX tx = new OrderTX();
        tx.setTxId(RootContext.getXID());
        tx.setState(OrderTX.STATE_TRY);
        orderTXMapper.insert(tx);

        // 调用账户余额扣减
        String result = accountFeignClient.reduce(userId, orderMoney);
        if (!"SUCCESS".equals(result)) {
            throw new RuntimeException("Failed to call Account Service. ");
        }
    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        System.err.println("---------confirm-----------");

        //删除记录
        int ret = orderTXMapper.delete(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, ctx.getXid()));
        return ret == 1;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
        System.err.println("---------cancel-----------" );
        String userId = ctx.getActionContext("userId").toString();
        String commodityCode = ctx.getActionContext("commodityCode").toString();
        OrderTX orderTX = orderTXMapper.selectOne(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, ctx.getXid()));
        if (orderTX == null){
            //为空, 空回滚
            orderTX = new OrderTX();
            orderTX.setTxId(ctx.getXid());
            orderTX.setState(OrderTX.STATE_CANCEL);
            orderTXMapper.insert(orderTX);
            return true;
        }
        //幂等处理
        if(orderTX.getState() == OrderTX.STATE_CANCEL){
            return true;
        }

        //恢复余额
        orderMapper.delete(new LambdaQueryWrapper<Order>().eq(Order::getUserId, userId).eq(Order::getCommodityCode, commodityCode));

        orderTX.setState(OrderTX.STATE_CANCEL);
        int ret = orderTXMapper.updateById(orderTX);
        return ret == 1;
    }
}
controller层
package org.example.controller;

import org.example.service.IOrderTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("orders")
public class OrderController {

    @Autowired
    private IOrderTCCService orderTCCService;

    @GetMapping(value = "/create")
    public String create(String userId, String commodityCode, int orderCount) {
        try {
            orderTCCService.tryCreate(userId, commodityCode, orderCount);
        } catch (Exception exx) {
            exx.printStackTrace();
            return "FAIL";
        }
        return "SUCCESS";
    }
}
feign层
package org.example.feign;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(name = "seata-demo2-service")
public interface AccountFeignClient {
    @GetMapping("/accounts/reduce")
    String reduce(@RequestParam("userId") String userId, @RequestParam("money") int money);


}
启动类
package org.example;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;


@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo2Application
{
    public static void main( String[] args )
    {
        SpringApplication.run(SeataDemo2Application.class,args);
    }
}

测试

正常:http://localhost:8088/businesses/purchase?rollback=false&count=2

超库存:http://localhost:8088/businesses/purchase?rollback=false&count=12

超余额:http://localhost:8088/businesses/purchase?rollback=false&count=8


SAGA模式:

        saga模式适用于长事务,如银行API调用,等繁杂的事务处理


http://www.kler.cn/a/548624.html

相关文章:

  • Golang关于结构体组合赋值的问题
  • 带Web界面的yt-dlp视频下载器
  • Qt在函数中更新 UI 或重新绘制图形用replot和QTimer::singleShot的区别
  • 如何有效防止TikTok多店铺入驻时IP关联问题?
  • Tortoise Git
  • 关于FSGithubPNG生成外链时描述出现路径问题
  • linux c 读写锁pthread_rwlock
  • 11. Docker 微服务实战(将项目打包生成镜像,在 Docker 当中作为容器实例运行)
  • 现在有什么赛道可以干到退休?
  • 3D打印学习
  • 53倍性能提升!TiDB 全局索引如何优化分区表查询?
  • 传感器篇(一)——深度相机
  • linux系统测试网络pps、带宽和延时(方案来源于阿里云)
  • 向量元素间是否相等mask矩阵
  • 日常知识点之遗留问题梳理(被问到用uml画设计模式)
  • CAS单点登录(第7版)1.首页
  • conda在powershell7.5执行出现问题
  • 亚远景-ASPICE 4.0与敏捷开发:如何实现高效协同
  • 数据结构 二叉树
  • Win11 远程 连接 Ubuntu20.04(局域网)