Seata搭建
1.初识Seata
Quick Start | Apache Seata 官网
2.准备nacos和 seata
启动nacos
startup.cmd -m standalone
账号nacos 密码nacos
搭建seata TC
这里下载的 1.4.2
seata-server-1.4.2
1.修改seata配置文件
registry.conf
这里我们使用nacos作为注册中心 和 配置中心
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# 注册中心 nacos
type = "nacos"
# 注册中心的配置
nacos {
application = "seata-tc-server"
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
# 配置文件
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos" #还是放nacos上面
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
store.mode=db
#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.181.202:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackFailedUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none
# 关闭metrics功能,提高性能
#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
2.创建数据库 创建表
建表SQL
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
3.运行
cmd 运行
seata-server.bat
win bat
mac sh
什么参数不加就是默认配置 默认端口 默认信息
nacos 注册完成
3.微服务集成Seata
比如在 pro 商品服务导入依赖
1.导入依赖
这时老版本 2.9.9 realease springboot版本 使用的 版本
<!--分布式事务-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<!--版本较低 1.3.0 因此排除-->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--seata starter 采用 1.4.2版本-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
因为自动装配,所以只需告诉服务 TC 服务的地址 在哪里
参考官网
2.yml配置
seata:
registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址
#参考tc服务自己的 registry.conf 中的配置,
#包括:地址 namespace, group,application-name, cluster
type: nacos
nacos: #TC
server-addr: 127.0.0.1:8848
group: DEFAULT_GROUP
namespace: "" #因为都在public 所以用空串
application: seata-tc-server
username: "nacos"
password: "nacos"
tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称
#把事务做分组 订单服务商品服务 它们是一个事务 想被同一个集群管理,它们就可以设置为同一个事务组
#方便将来做快速失败 集群容错的 管理
service:
vgroup-mapping: #事务组与TC服务cluster的映射关系 事务组怎么找到我的集群名称呢,那么将映射到default这个集群当中
seata-demo: default #没有集群默认是 default nacos里 seata-demo 映射 到 default
重启
把需要做分布式事务的服务都做一遍配置
重启
4.实现分布式事务
1.XA模式
基于数据库实现的 Mysql支持 XA 先让一个不提交 等所有都没有问题 然后同时提交
失败一个回滚所有,它是基于数据库本身的实现,来阻塞提交,性能不好
1.Seata的实现
优势
强一致性
缺点
可用性降低了
2.代码实现
在每个需要全局事务管理的服务加上
seata:
registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址
#参考tc服务自己的 registry.conf 中的配置,
#包括:地址 namespace, group,application-name, cluster
type: nacos
nacos: #TC
server-addr: 127.0.0.1:8848
group: DEFAULT_GROUP
namespace: "" #因为都在public 所以用空串
application: seata-tc-server
username: "nacos"
password: "nacos"
tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称
#把事务做分组 订单服务商品服务 它们是一个事务 想被同一个集群管理,它们就可以设置为同一个事务组
#方便将来做快速失败 集群容错的 管理
service:
vgroup-mapping: #事务组与TC服务cluster的映射关系 事务组怎么找到我的集群名称呢,那么将映射到default这个集群当中
seata-demo: default #没有集群默认是 default nacos里 seata-demo 映射 到 default
data-source-proxy-mode: XA #开启数据源代理的XA模式
加上GlobalTransaction注解
@GetMapping("/add")
@GlobalTransactional(rollbackFor = Exception.class)
public R addDingdan(@RequestParam("order") String order) {
SpuInfoEntity spuInfoEntity = new SpuInfoEntity();
spuInfoEntity.setSpuName(order);
spuInfoService.save(spuInfoEntity);
CouponHistoryEntity couponHistoryEntity = new CouponHistoryEntity();
couponHistoryEntity.setMemberNickName(order);
couponFeignService.saveTrySeata(couponHistoryEntity);
return R.ok();
}
另一个服务 也一定要加事务要不然会造成死锁
@RequestMapping("/save")
@Transactional(rollbackFor = Exception.class)
public R save(@RequestBody CouponHistoryEntity couponHistory) throws Exception {
couponHistoryService.save(couponHistory);
CouponHistoryEntity couponHistory1 = new CouponHistoryEntity();
BeanUtils.copyProperties(couponHistory,couponHistory1);
couponHistory1.setId(1011L);
couponHistoryService.save(couponHistory1);
CouponHistoryEntity couponHistory2 = new CouponHistoryEntity();
BeanUtils.copyProperties(couponHistory,couponHistory2);
couponHistory2.setId(1012L);
couponHistoryService.save(couponHistory2);
CouponHistoryEntity couponHistory3 = new CouponHistoryEntity();
BeanUtils.copyProperties(couponHistory,couponHistory3);
couponHistory3.setId(1013L);
couponHistoryService.save(couponHistory3);
return R.ok();
}
2.AT模式
1.AT模式原理
2.AT模式的脏写问题
全局锁的机制
3. 代码实现
上面一开始就导入了 lock_table 到 seata服务器所在的数据库
然后只需要导入 undo_log 表就可以了 每个用到分布式事务的微服务所在的数据库都必须要导入一张undo_log表
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
seata:
data-source-proxy-mode: AT
重启一下就完事了,无代码侵入
测试回滚成功,和上面XA一样的代码不用动
3.TCC模式
1.TCC模式原理
2.TCC代码实现
1.先建一张 冻结表
CREATE table `pms_spu_info_tbl`(
`xid` varchar(128) NOT NULL,
`user_id` varchar(255) Default null comment '用户ID',
`freeze_money` int unsigned default '0' comment '冻结金额',
`state` int default null comment '事务状态,0:try,1:confirm,2:cancel',
Primary key (`xid`) USING BTREE
) ENGINE = InnoDB Default charset=utf8;
2.try业务
记录冻结金额和事务状态到 pms_spu_info_tbl
添加金额
3.confirm业务
根据xid 删除account_freeze表的冻结记录
4.cancel业务
减少金额
根据xid 修改account_freeze回滚状态
state 2
5.如何判断是否空回滚
6.如何避免业务悬挂
3.声明TCC接口
package com.jmj.gulimall.product.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface AccountTCCService {
@TwoPhaseBusinessAction(name="induct",commitMethod = "commit",rollbackMethod = "rollback")
void induct(@BusinessActionContextParameter(paramName = "params") String params,
@BusinessActionContextParameter(paramName = "money") int money);
boolean commit(BusinessActionContext businessActionContext);
boolean rollback(BusinessActionContext businessActionContext);
}
package com.jmj.gulimall.product.tcc;
import com.jmj.gulimall.product.dao.PmsSpuInfoTblMapper;
import com.jmj.gulimall.product.entity.SpuInfoEntity;
import com.jmj.gulimall.product.service.SpuInfoService;
import com.jmj.gulimall.product.vo.spu.PmsSpuInfoTbl;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
public class AccountTCCServiceImpl implements AccountTCCService {
@Autowired
private SpuInfoService spuInfoService;
@Autowired
private PmsSpuInfoTblMapper pmsSpuInfoTblMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void induct(String params, int money) {
//0.获取全局事务ID
String xid = RootContext.getXID();
PmsSpuInfoTbl pmsSpuInfoTbl = pmsSpuInfoTblMapper.selectById(xid);
if (pmsSpuInfoTbl != null) {
return ;
}
//1.扣减可用余额
SpuInfoEntity byId = spuInfoService.getById(9L);
byId.setPublishStatus(byId.getPublishStatus() + money);
spuInfoService.updateById(byId);
//2.记录冻结金额 事务状态
PmsSpuInfoTbl p = new PmsSpuInfoTbl();
p.setUserId("123");
p.setFreezeMoney(money);
p.setState(0);
p.setXid(xid);
pmsSpuInfoTblMapper.insert(p);
}
@Override
public boolean commit(BusinessActionContext businessActionContext) {
//0.获取全局事务ID
String xid = businessActionContext.getXid();
//1.删除冻结金额
return pmsSpuInfoTblMapper.deleteById(xid) == 1;
}
@Override
public boolean rollback(BusinessActionContext businessActionContext) {
//0.获取全局事务ID
String xid = businessActionContext.getXid();
PmsSpuInfoTbl pmsSpuInfoTbl = pmsSpuInfoTblMapper.selectById(xid);
if (pmsSpuInfoTbl != null) {
if (pmsSpuInfoTbl.getState() ==2) {
//幂等操作
return true;
}
Integer freezeMoney = pmsSpuInfoTbl.getFreezeMoney();
//1.扣减可用余额
SpuInfoEntity byId = spuInfoService.getById(9L);
byId.setPublishStatus(byId.getPublishStatus() - freezeMoney);
spuInfoService.updateById(byId);
pmsSpuInfoTbl.setState(2);
pmsSpuInfoTblMapper.updateById(pmsSpuInfoTbl);
}else {
PmsSpuInfoTbl pmsSpuInfoTbl1 = new PmsSpuInfoTbl();
String params = businessActionContext.getActionContext("params").toString();
pmsSpuInfoTbl1.setUserId(params);
pmsSpuInfoTbl1.setState(2);
pmsSpuInfoTbl1.setFreezeMoney(0);
pmsSpuInfoTbl1.setXid(xid);
pmsSpuInfoTblMapper.insert(pmsSpuInfoTbl1);
}
return true;
}
}
@GetMapping("/testTcc")
public R testTcc() {
accountTCCService.induct("JMJ",12);
return R.ok();
}
package com.jmj.gulimall.coupon.controller;
import com.jmj.common.utils.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
@FeignClient(value = "gulimall-product",path = "/product/spuinfo")
public interface PmsFeignService {
@GetMapping("/testTcc")
public R testTcc();
}
@GetMapping("/tet/tcc")
@GlobalTransactional(rollbackFor = Exception.class)
public R tcc(){
CouponEntity couponEntity = new CouponEntity();
couponEntity.setCouponImg(UUID.randomUUID().toString());
couponEntity.setCouponName(UUID.randomUUID().toString());
couponService.save(couponEntity);
pmsFeignService.testTcc();
if (true){
throw new RuntimeException("测试异常");
}
return R.ok();
}
最终回滚
能和AT模式混用
4.SAGA模式
类似于工作流
无隔离性
5.SEATA高可用
1.实现微服务高可用和异地容灾
本地模拟复制一份 文件夹
把配置文件改成杭州集群
启动命令
seata-server.bat -p 8092
这样集群部署成功了
若出现容灾的情况下
利用nacos的配置热更新 就可以了
可以新建一个通用客户端配置
client.yaml
官网有详细配置
这里用我自己的简略版
#事务组的映射关系
service.vgroupMapping.seata-demo=default
service.enableDegrade=false
service.disableGlobalTransaction=false
#与TC服务的通信配置
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
#rm配置
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
#tm配置
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
#undo配置
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
自动刷新 注解这里记录一下
/**
* 优惠券信息
*
* @author jiangmingji
* @email 123456789@qq.com
* @date 2024-03-21 21:24:36
*/
@RefreshScope //动态从配置中心获取配置
@RestController
@RequestMapping("/coupon/coupon")
public class CouponController {
@Autowired
private CouponService couponService;
@Value("${coupon.user.name}")
private String name;
@Value("${coupon.user.age}")
private Integer age;
微服务读取配置
这样给微服务 配上配置就可以了
seata:
data-source-proxy-mode: AT #开启数据源代理的XA模式
registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址
#参考tc服务自己的 registry.conf 中的配置,
#包括:地址 namespace, group,application-name, cluster
type: nacos
nacos: #TC
server-addr: 127.0.0.1:8848
group: DEFAULT_GROUP
namespace: "" #因为都在public 所以用空串
application: seata-tc-server
username: "nacos"
password: "nacos"
tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group: 'SEATA_GROUP'
dataId: 'client.properties'
username: 'nacos'
password: 'nacos'
namespace: ""
重启服务
修改配置
将集群切换为杭州 HZ
成功注册到8092那个seata-server上面了