SpringCloud整合seata,XA、AT、TCC、SAGA模式
参考资料:
SpringCloud-Alibaba搭建
SpringCloud-nacos整合
Seata部署
参考demo(及学习资料)
seata官网
参考视频c(AT模式的UNDO_LOG讲的可能有点问题,但是很通俗易懂)
参考视频2(不太通俗易懂)
沽泡付费视频(就是对着官网念)
上述三个视频的参考资料
准备环境:
该教程默认已经有如下环境,如果没有可以参考上述教程:
- 部署了Nacos
- 部署了Seata
- 搭建了SpringCloud
- 并且版本已经做到了统一(只要保证JVM里面使用的jar包版本和部署的版本一致即可)
- 同一系列版本见Wiki
模拟事件-搭建demo:
因为seata是分布式事务,所以这个例子需要多个微服务,多个库进行联动
创建数据库
首先创建两个数据库
CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata_db1` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */;
USE `seata_db1`;
/*Table structure for table `money` */
DROP TABLE IF EXISTS `money`;
CREATE TABLE `money` (
`id` varchar(32) NOT NULL COMMENT '主键',
`user_name` varchar(32) DEFAULT NULL COMMENT '名字',
`bank_money` double DEFAULT NULL COMMENT '银行余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*Data for the table `money` */
insert into `money`(`id`,`user_name`,`bank_money`) values
('084d862fa66c4c82886a4b0bb9214ab1','张三',100);
CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata_db2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */;
USE `seata_db2`;
/*Table structure for table `order_details` */
DROP TABLE IF EXISTS `order_details`;
CREATE TABLE `order_details` (
`id` varchar(32) NOT NULL COMMENT '主键',
`goods_name` varchar(32) DEFAULT NULL COMMENT '商品名',
`count` int(11) DEFAULT NULL COMMENT '仓库余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*Data for the table `order_details` */
insert into `order_details`(`id`,`goods_name`,`count`) values
('7072809e86834335843bc918c33074ec','desk',50);
创建示例微服务
用搭建好的SpringClud框架,默认已经整合了nacos,并且已经部署好了seata
关于微服务的创建,这里就不再赘述了,可以参考该文章
参考上述文章,创建两个微服务 seata-demo1和seata-demo2,然后进行相关依赖的导入
这里是将所有的依赖加入到了公共服务里面,请根据个人情况使用,在common-service服务中的pom.xml文件,
Lombok依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
控制层依赖
<!-- springboot starter web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
mybatis-plus及数据库相关
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1.tmp</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--引入mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
fegin组件
<!--fegin组件-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
nacos相关(使用时默认的,如果nacos不是上述系列的则需要指定版本)
<!--引入nacos client的依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--引入nacos config 依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
还有一些bootstrap.yml启动的辅助依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
最后添加的依赖如下:
然后将公共依赖引入创建的两个微服务,参考
<dependency>
<groupId>org.example</groupId>
<artifactId>common-service</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
将相关配置交给nacos管理,可以参考文章,这里只是贴代码,不赘述
- seata-demo1
bootstrap.yml
spring:
profiles:
include:
uat
bootstrap-uat.yml
spring:
config:
use-legacy-processing: true
profiles.active: uat
application:
name: seata-demo1
cloud:
nacos:
config:
server-addr: localhost:8848
group: DEFAULT_GROUP
username: nacos
password: nacos
file-extension: yaml
refresh-enabled: true
然后将相关配置放入到nacos中
server:
port: 8081
spring:
nacos:
discovery:
server-addr: localhost:8848
username: nacos
password: nacos
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_db1?characterEncoding=UTF-8
username: root
password: 123888
- seata-demo2
bootstrap.yml
spring:
profiles:
include:
uat
bootstrap-uat.yml
spring:
config:
use-legacy-processing: true
profiles.active: uat
application:
name: seata-demo2
cloud:
nacos:
config:
server-addr: localhost:8848
group: DEFAULT_GROUP
username: nacos
password: nacos
file-extension: yaml
refresh-enabled: true
然后将相关配置放入到nacos中
server:
port: 8080
spring:
nacos:
discovery:
server-addr: localhost:8848
username: nacos
password: nacos
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_db2?characterEncoding=UTF-8
username: root
password: 123888
这样的话,就可以连接到数据库了,然后编写 mybatis-plus 的三层,参考文章,这里将不再赘述,仅仅是贴代码
-
seata-demo1
package org.example.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("money")
public class Money {
@TableId
private String id;
@TableField("user_name")
private String userName;
@TableField("bank_money")
private Double bankMoney;
}
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.Money;
public interface MoneyMapper extends BaseMapper<Money> {
}
package org.example.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Money;
public interface MoneyService extends IService<Money> {
}
package org.example.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.example.dao.MoneyMapper;
import org.example.entity.Money;
import org.springframework.transaction.annotation.Transactional;
@Service
public class MoneyServiceImpl extends ServiceImpl<MoneyMapper, Money> implements MoneyService{
}
- seata-demo2
@Data
@TableName("order_details")
public class OrderDetails {
@TableId
private String id;
@TableField("goods_name")
private String goodsName;
@TableField("count")
private int count;
}
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.OrderDetails;
public interface OrderDetailsMapper extends BaseMapper<OrderDetails> {
}
package org.example.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.OrderDetails;
public interface OrderDetailsService extends IService<OrderDetails> {
}
然后编写业务代码,分布式事务的调用,模拟扣款和扣库存
- seata-demo1
feign调用
package org.example.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "seata-demo2-service")
public interface Demo2Client {
@RequestMapping("order/addCount")
ResponseEntity<String> addCount(@RequestParam("count") int count);
}
MoneyService
package org.example.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Money;
public interface MoneyService extends IService<Money> {
void changeMoney(Money money,Double num);
}
MoneyServiceImpl
package org.example.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.example.dao.MoneyMapper;
import org.example.entity.Money;
@Service
public class MoneyServiceImpl extends ServiceImpl<MoneyMapper, Money> implements MoneyService{
public void changeMoney(Money money, Double num) {
Double bankMoney = money.getBankMoney();
if (bankMoney >= num) {
money.setBankMoney(bankMoney - num);
this.updateById(money);
}else{
throw new RuntimeException("余额不足");
}
}
}
TransService
package org.example.service;
public interface TransService {
void globalRollBackDemo(int orderCount);
}
TransServiceImpl
package org.example.service;
import lombok.extern.slf4j.Slf4j;
import org.example.client.Demo2Client;
import org.example.entity.Money;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class TransServiceImpl implements TransService {
@Autowired
private MoneyService moneyService;
@Autowired
private Demo2Client demo2Client;
/**
* 分布式事务 回滚例子
*/
public void globalRollBackDemo(int orderCount) {
log.info("=================开始扣钱========================");
List<Money> list = moneyService.list();
Money money = list.get(0);
Double bankMoney = money.getBankMoney();
log.info("=================查询到银行余额为:"+bankMoney+",开始扣款:20=================");
moneyService.changeMoney(money,20D);
log.info("=================扣款成功,开始扣库存,扣减库存为:"+orderCount+"========================");
demo2Client.addCount(orderCount);
}
}
MoneyController
package org.example.controller;
import org.example.client.Demo2Client;
import org.example.service.TransService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequestMapping("money")
public class MoneyController {
@Autowired
private TransService transService;
@RequestMapping("test/globalTransactional/fail")
public ResponseEntity<String> testGTRollBack() {
transService.globalRollBackDemo(60);
return new ResponseEntity<String>("sucess",HttpStatus.OK);
}
@RequestMapping("test/globalTransactional/sucess")
public ResponseEntity<String> testGTSucess() {
transService.globalRollBackDemo(40);
return new ResponseEntity<String>("sucess",HttpStatus.OK);
}
}
SeataDemo1Application
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo1Application
{
public static void main( String[] args )
{
SpringApplication.run(SeataDemo1Application.class,args);
}
}
- seata-demo2
OrderDetailsController
package org.example.controller;
import org.example.entity.OrderDetails;
import org.example.service.OrderDetailsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@Controller
@RequestMapping("order")
public class OrderDetailsController {
@Autowired
private OrderDetailsService orderService;
@RequestMapping("addCount")
public ResponseEntity<String> addCount(@RequestParam("count") Integer count) {
orderService.updateOrderDetails(count);
return new ResponseEntity<String>("success",HttpStatus.OK);
}
}
OrderDetailsService
package org.example.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.OrderDetails;
public interface OrderDetailsService extends IService<OrderDetails> {
// 修改库存剩余
void updateOrderDetails(Integer count);
}
OrderDetailsServiceImpl
package org.example.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.example.dao.OrderDetailsMapper;
import org.example.entity.OrderDetails;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class OrderDetailsServiceImpl extends ServiceImpl<OrderDetailsMapper, OrderDetails> implements OrderDetailsService {
@Override
public void updateOrderDetails(Integer count) {
List<OrderDetails> list = this.list();
// 取第一个进行
OrderDetails orderDetails = list.get(0);
if (orderDetails.getCount()>=count) {
int sum = orderDetails.getCount() - count;
orderDetails.setCount(sum);
this.updateById(orderDetails);
log.info("库存扣减成功");
}else{
throw new RuntimeException("库存为:"+list.get(0).getCount()+"不足,开始回滚");
}
}
}
SeataDemo2Application
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo2Application
{
public static void main( String[] args )
{
SpringApplication.run(SeataDemo2Application.class,args);
}
}
XA模式:
导入依赖
- 可以在公共的服务,或者私有的服务中导入seata的jar包
- jar包的版本如果和Spring-Cloud是同一个系列的(见版本选择),可以不指定,如果不是一个系列的,则需要指定版本(和部署的版本一致)
- 本教程是将jar包放入到公共服务中,并且指定版本号
<!-- 注意一定要引入对版本,要引入spring-cloud版本seata,而不是springboot版本的seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<!-- 排除掉springcloud默认的seata版本,以免版本不一致出现问题-->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.7.0</version>
</dependency>
添加seata的相关配置
在nacos管理的配置中,找到seata-demo1和seata-demo2的配置,添加如下配置:
#seata客户端配置
seata:
enabled: true
application-id: seata_tx
tx-service-group: seata_tx_group
service:
vgroup-mapping:
seata_tx_group: default
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
data-source-proxy-mode: XA
开启分布式事务---XA模式
- 在需要开启分布式事务的地方使用 @GlobalTransactional 修饰即可
- 本地事务,建议加上 @Transactional
- 例如我们要对,上面的扣钱和扣库存业务,添加分布式事务,只需要在seata-demo1的
TransServiceImpl添加@GlobalTransactional
- 然后在本地事务使用@Transactional修饰
- 如:seata-demo1中的 MoneyServiceImpl 以及seata-demo2中的 OrderDetailsServiceImpl
这样,分布式事务XA模式就实现了
AT模式:
AT模式和XA模式在实现上基本没有区别,只需要将YML文件中的XA改为AT即可
TCC模式:
关于TCC模式的原理,业务悬挂,空回滚等原理,以及TCC专用的注解这些就不说了,详情可以看上面的视频,这里仅仅是讲怎么实现。
基于上面搭出的框架,按照下面的步骤来编写TCC模式
TCC模式是在AT模式上进行改造的,所以YML的写法和AT模式一样
修改YML配置
seata-demo1
server:
port: 8081
spring:
nacos:
discovery:
server-addr: localhost:8848
username: nacos
password: nacos
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_db1?characterEncoding=UTF-8
username: root
password: 123888
#seata客户端配置
seata:
enabled: true
application-id: seata_tx
tx-service-group: seata_tx_group
service:
vgroup-mapping:
seata_tx_group: default
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
data-source-proxy-mode: AT
seata-demo2
server:
port: 8080
spring:
nacos:
discovery:
server-addr: localhost:8848
username: nacos
password: nacos
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_db2?characterEncoding=UTF-8
username: root
password: 123888
#seata客户端配置
seata:
enabled: true
application-id: seata_tx
tx-service-group: seata_tx_group
service:
vgroup-mapping:
seata_tx_group: default
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
data-source-proxy-mode: AT
创建数据库
可以在上述seata_db1数据库中创建两个数据库:t_account和t_account_tx
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_account
-- ----------------------------
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account` (
`id` INT NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`money` INT NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of t_account
-- ----------------------------
INSERT INTO `t_account` VALUES (1, 'U100000', 900);
-- ----------------------------
-- Table structure for t_account_tx
-- ----------------------------
DROP TABLE IF EXISTS `t_account_tx`;
CREATE TABLE `t_account_tx` (
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
`tx_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '事务id',
`freeze_money` INT NULL DEFAULT NULL COMMENT '冻结金额',
`state` INT NULL DEFAULT NULL COMMENT '状态 0try 1confirm 2cancel',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;
可以在上述seata_db2数据库中创建两个数据库:t_order和t_order_tx
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`id` INT NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`commodity_code` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`count` INT NULL DEFAULT 0,
`money` INT NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 25 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of t_order
-- ----------------------------
-- ----------------------------
-- Table structure for t_order_tx
-- ----------------------------
DROP TABLE IF EXISTS `t_order_tx`;
CREATE TABLE `t_order_tx` (
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键',
`tx_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '事务id',
`state` INT NULL DEFAULT NULL COMMENT '状态 0try 1confirm 2cancel',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;
编写Mybatis-plus的三层及业务层
seata-demo1
实体
package org.example.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("t_account")
public class Account {
@TableId(type = IdType.AUTO)
private Integer id;
private String userId;
private int money;
}
package org.example.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("t_account_tx")
public class AccountTX {
public static final int STATE_TRY = 0;
public static final int STATE_CONFIRM = 1;
public static final int STATE_CANCEL = 2;
@TableId(type = IdType.AUTO)
private Integer id;
private String txId;
private int freezeMoney;
private int state = STATE_TRY;
}
dao层
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.Account;
public interface AccountMapper extends BaseMapper<Account> {
}
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.AccountTX;
public interface AccountTXMapper extends BaseMapper<AccountTX> {
}
service层
interface接口
package org.example.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Account;
public interface IAccountService extends IService<Account> {
/**
* 账户扣款
* @param userId
* @param money
* @return
*/
void reduce(String userId, int money);
}
package org.example.service;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/**
* TCC 二阶段提交业务接口
*/
@LocalTCC
public interface IAccountTCCService {
/**
* try-预扣款
*/
@TwoPhaseBusinessAction(name="tryReduce", commitMethod = "confirm", rollbackMethod = "cancel")
void tryReduce(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);
/**
* confirm-提交
* @param ctx
* @return
*/
boolean confirm(BusinessActionContext ctx);
/**
* cancel-回滚
* @param ctx
* @return
*/
boolean cancel(BusinessActionContext ctx);
}
实现层
package org.example.service;
import org.example.dao.AccountMapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.example.entity.Account;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {
@Override
@Transactional
public void reduce(String userId, int money) {
Account one = lambdaQuery().eq(Account::getUserId, userId).one();
if(one != null && one.getMoney() < money){
throw new RuntimeException("Not Enough Money ...");
}
lambdaUpdate().setSql("money = money - " + money)
.eq(Account::getUserId, userId)
.update();
}
}
package org.example.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.example.dao.AccountMapper;
import org.example.dao.AccountTXMapper;
import org.example.entity.Account;
import org.example.entity.AccountTX;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class AccountTCCServiceImpl implements IAccountTCCService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private AccountTXMapper accountTXMapper;
@Override
public void tryReduce(String userId, int money) {
System.err.println("-----------tryReduce-------------" + RootContext.getXID());
//业务悬挂
AccountTX accountTX = accountTXMapper.selectOne(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, RootContext.getXID()));
if (accountTX != null){
//存在,说明已经canel执行过类,拒绝服务
return;
}
Account one = accountMapper.selectOne(new LambdaQueryWrapper<Account>().eq(Account::getUserId, userId));
if(one != null && one.getMoney() < money){
throw new RuntimeException("Not Enough Money ...");
}
LambdaUpdateWrapper<Account> wrapper = new LambdaUpdateWrapper<>();
wrapper.setSql("money = money - " + money);
wrapper.eq(Account::getUserId, userId);
accountMapper.update(null, wrapper);
AccountTX tx = new AccountTX();
tx.setFreezeMoney(money);
tx.setTxId(RootContext.getXID());
tx.setState(AccountTX.STATE_TRY);
accountTXMapper.insert(tx);
}
@Override
public boolean confirm(BusinessActionContext ctx) {
System.err.println("-----------confirm-------------");
//删除记录
int ret = accountTXMapper.delete(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, ctx.getXid()));
return ret == 1;
}
@Override
public boolean cancel(BusinessActionContext ctx) {
System.err.println("-----------cancel-------------");
String userId = ctx.getActionContext("userId").toString();
String money = ctx.getActionContext("money").toString();
AccountTX accountTX = accountTXMapper.selectOne(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, ctx.getXid()));
if (accountTX == null){
//为空, 空回滚
accountTX = new AccountTX();
accountTX.setTxId(ctx.getXid());
accountTX.setState(AccountTX.STATE_CANCEL);
if(money != null){
accountTX.setFreezeMoney(Integer.parseInt(money));
}
accountTXMapper.insert(accountTX);
return true;
}
//幂等处理
if(accountTX.getState() == AccountTX.STATE_CANCEL){
return true;
}
//恢复余额
accountMapper.update(null, new LambdaUpdateWrapper<Account>()
.setSql("money = money + " + money)
.eq(Account::getUserId, userId));
accountTX.setFreezeMoney(0);
accountTX.setState(AccountTX.STATE_CANCEL);
int ret = accountTXMapper.updateById(accountTX);
return ret == 1;
}
}
controller层
package org.example.controller;
import org.example.service.IAccountTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("accounts")
public class AccountController {
//@Autowired
//private IAccountService accountService;
@Autowired
private IAccountTCCService accountTCCService;
@GetMapping(value = "/reduce")
public String reduce(String userId, int money) {
try {
accountTCCService.tryReduce(userId, money);
} catch (Exception exx) {
exx.printStackTrace();
return "FAIL";
}
return "SUCCESS";
}
}
启动类
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo1Application
{
public static void main( String[] args )
{
SpringApplication.run(SeataDemo1Application.class,args);
}
}
seata-demo2
实体层
package org.example.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.AUTO)
private Integer id;
private String userId;
private String commodityCode;
private Integer count;
private Integer money;
}
package org.example.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("t_order_tx")
public class OrderTX {
public static final int STATE_TRY = 0;
public static final int STATE_CONFIRM = 1;
public static final int STATE_CANCEL = 2;
@TableId(type = IdType.AUTO)
private Integer id;
private String txId;
private int state = STATE_TRY;
}
dao层
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.Order;
public interface OrderMapper extends BaseMapper<Order> {
}
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.example.entity.OrderTX;
public interface OrderTXMapper extends BaseMapper<OrderTX> {
}
service层
interface层
package org.example.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.example.entity.Order;
public interface IOrderService extends IService<Order> {
/**
* 创建订单
*/
void create(String userId, String commodityCode, int orderCount);
}
package org.example.service;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/**
* TCC 二阶段提交业务接口
*/
@LocalTCC
public interface IOrderTCCService {
/**
* try-预扣款
*/
@TwoPhaseBusinessAction(name="tryCreate", commitMethod = "confirm", rollbackMethod = "cancel")
void tryCreate(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
@BusinessActionContextParameter(paramName = "orderCount") int orderCount);
/**
* confirm-提交
* @param ctx
* @return
*/
boolean confirm(BusinessActionContext ctx);
/**
* cancel-回滚
* @param ctx
* @return
*/
boolean cancel(BusinessActionContext ctx);
}
实现层
package org.example.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.example.entity.Order;
import org.example.feign.AccountFeignClient;
import org.example.dao.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
@Autowired
private AccountFeignClient accountFeignClient;
@Override
@Transactional
public void create(String userId, String commodityCode, int count) {
// 定单总价 = 订购数量(count) * 商品单价(100)
int orderMoney = count * 100;
// 生成订单
Order order = new Order();
order.setCount(count);
order.setCommodityCode(commodityCode);
order.setUserId(userId);
order.setMoney(orderMoney);
super.save(order);
// 调用账户余额扣减
String result = accountFeignClient.reduce(userId, orderMoney);
if (!"SUCCESS".equals(result)) {
throw new RuntimeException("Failed to call Account Service. ");
}
}
}
package org.example.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.example.dao.OrderMapper;
import org.example.dao.OrderTXMapper;
import org.example.entity.Order;
import org.example.entity.OrderTX;
import org.example.feign.AccountFeignClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderTCCServiceImpl implements IOrderTCCService {
@Autowired
private AccountFeignClient accountFeignClient;
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderTXMapper orderTXMapper;
@Override
public void tryCreate(String userId, String commodityCode, int count) {
System.err.println("---------tryCreate-----------");
//业务悬挂
OrderTX orderTX = orderTXMapper.selectOne(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, RootContext.getXID()));
if (orderTX != null){
//存在,说明已经canel执行过类,拒绝服务
return;
}
// 定单总价 = 订购数量(count) * 商品单价(100)
int orderMoney = count * 100;
// 生成订单
Order order = new Order();
order.setCount(count);
order.setCommodityCode(commodityCode);
order.setUserId(userId);
order.setMoney(orderMoney);
orderMapper.insert(order);
OrderTX tx = new OrderTX();
tx.setTxId(RootContext.getXID());
tx.setState(OrderTX.STATE_TRY);
orderTXMapper.insert(tx);
// 调用账户余额扣减
String result = accountFeignClient.reduce(userId, orderMoney);
if (!"SUCCESS".equals(result)) {
throw new RuntimeException("Failed to call Account Service. ");
}
}
@Override
public boolean confirm(BusinessActionContext ctx) {
System.err.println("---------confirm-----------");
//删除记录
int ret = orderTXMapper.delete(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, ctx.getXid()));
return ret == 1;
}
@Override
public boolean cancel(BusinessActionContext ctx) {
System.err.println("---------cancel-----------" );
String userId = ctx.getActionContext("userId").toString();
String commodityCode = ctx.getActionContext("commodityCode").toString();
OrderTX orderTX = orderTXMapper.selectOne(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, ctx.getXid()));
if (orderTX == null){
//为空, 空回滚
orderTX = new OrderTX();
orderTX.setTxId(ctx.getXid());
orderTX.setState(OrderTX.STATE_CANCEL);
orderTXMapper.insert(orderTX);
return true;
}
//幂等处理
if(orderTX.getState() == OrderTX.STATE_CANCEL){
return true;
}
//恢复余额
orderMapper.delete(new LambdaQueryWrapper<Order>().eq(Order::getUserId, userId).eq(Order::getCommodityCode, commodityCode));
orderTX.setState(OrderTX.STATE_CANCEL);
int ret = orderTXMapper.updateById(orderTX);
return ret == 1;
}
}
controller层
package org.example.controller;
import org.example.service.IOrderTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("orders")
public class OrderController {
@Autowired
private IOrderTCCService orderTCCService;
@GetMapping(value = "/create")
public String create(String userId, String commodityCode, int orderCount) {
try {
orderTCCService.tryCreate(userId, commodityCode, orderCount);
} catch (Exception exx) {
exx.printStackTrace();
return "FAIL";
}
return "SUCCESS";
}
}
feign层
package org.example.feign;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "seata-demo2-service")
public interface AccountFeignClient {
@GetMapping("/accounts/reduce")
String reduce(@RequestParam("userId") String userId, @RequestParam("money") int money);
}
启动类
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("org.example.dao")
@EnableFeignClients
public class SeataDemo2Application
{
public static void main( String[] args )
{
SpringApplication.run(SeataDemo2Application.class,args);
}
}
测试
正常:http://localhost:8088/businesses/purchase?rollback=false&count=2
超库存:http://localhost:8088/businesses/purchase?rollback=false&count=12
超余额:http://localhost:8088/businesses/purchase?rollback=false&count=8
SAGA模式:
saga模式适用于长事务,如银行API调用,等繁杂的事务处理