当前位置: 首页 > article >正文

Seata搭建

1.初识Seata

Quick Start | Apache Seata 官网

2.准备nacos和 seata

启动nacos

startup.cmd -m standalone

账号nacos 密码nacos

搭建seata TC

这里下载的 1.4.2 

seata-server-1.4.2

1.修改seata配置文件 

registry.conf

这里我们使用nacos作为注册中心 和 配置中心

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 注册中心 nacos
  type = "nacos"

# 注册中心的配置
  nacos {
    application = "seata-tc-server"
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }

}

# 配置文件
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos" #还是放nacos上面

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
 
}

store.mode=db
#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.181.202:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000


#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackFailedUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000

# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none

# 关闭metrics功能,提高性能
#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

 

2.创建数据库 创建表 

建表SQL

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

3.运行 

cmd 运行 

seata-server.bat

win bat

mac sh

 什么参数不加就是默认配置 默认端口 默认信息

 nacos 注册完成

3.微服务集成Seata

比如在 pro 商品服务导入依赖

1.导入依赖 

这时老版本 2.9.9 realease springboot版本 使用的 版本

        <!--分布式事务-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <!--版本较低 1.3.0 因此排除-->
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--seata starter 采用 1.4.2版本-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.2</version>
        </dependency>

因为自动装配,所以只需告诉服务 TC 服务的地址 在哪里

参考官网

2.yml配置

seata:
  registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址
    #参考tc服务自己的 registry.conf 中的配置,
    #包括:地址 namespace, group,application-name, cluster
    type: nacos
    nacos: #TC
      server-addr: 127.0.0.1:8848
      group: DEFAULT_GROUP
      namespace: "" #因为都在public 所以用空串
      application: seata-tc-server
      username: "nacos"
      password: "nacos"
  tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称
  #把事务做分组 订单服务商品服务 它们是一个事务 想被同一个集群管理,它们就可以设置为同一个事务组
  #方便将来做快速失败 集群容错的 管理
  service:
    vgroup-mapping: #事务组与TC服务cluster的映射关系 事务组怎么找到我的集群名称呢,那么将映射到default这个集群当中
      seata-demo: default #没有集群默认是 default nacos里   seata-demo 映射 到 default

重启

把需要做分布式事务的服务都做一遍配置

重启 

4.实现分布式事务

1.XA模式

 基于数据库实现的 Mysql支持 XA  先让一个不提交 等所有都没有问题 然后同时提交 

 失败一个回滚所有,它是基于数据库本身的实现,来阻塞提交,性能不好

 1.Seata的实现

 优势

强一致性

缺点

可用性降低了

2.代码实现

在每个需要全局事务管理的服务加上

seata:
  registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址
    #参考tc服务自己的 registry.conf 中的配置,
    #包括:地址 namespace, group,application-name, cluster
    type: nacos
    nacos: #TC
      server-addr: 127.0.0.1:8848
      group: DEFAULT_GROUP
      namespace: "" #因为都在public 所以用空串
      application: seata-tc-server
      username: "nacos"
      password: "nacos"
  tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称
  #把事务做分组 订单服务商品服务 它们是一个事务 想被同一个集群管理,它们就可以设置为同一个事务组
  #方便将来做快速失败 集群容错的 管理
  service:
    vgroup-mapping: #事务组与TC服务cluster的映射关系 事务组怎么找到我的集群名称呢,那么将映射到default这个集群当中
      seata-demo: default #没有集群默认是 default nacos里   seata-demo 映射 到 default
  data-source-proxy-mode: XA #开启数据源代理的XA模式

 加上GlobalTransaction注解

  @GetMapping("/add")
    @GlobalTransactional(rollbackFor = Exception.class)
    public R addDingdan(@RequestParam("order") String order) {
        SpuInfoEntity spuInfoEntity = new SpuInfoEntity();
        spuInfoEntity.setSpuName(order);
        spuInfoService.save(spuInfoEntity);

        CouponHistoryEntity couponHistoryEntity = new CouponHistoryEntity();
        couponHistoryEntity.setMemberNickName(order);
        couponFeignService.saveTrySeata(couponHistoryEntity);


        return R.ok();
    }

另一个服务 也一定要加事务要不然会造成死锁

 @RequestMapping("/save")
    @Transactional(rollbackFor = Exception.class)
    public R save(@RequestBody CouponHistoryEntity couponHistory) throws Exception {
		couponHistoryService.save(couponHistory);

       CouponHistoryEntity couponHistory1 = new CouponHistoryEntity();
        BeanUtils.copyProperties(couponHistory,couponHistory1);
        couponHistory1.setId(1011L);
        couponHistoryService.save(couponHistory1);

        CouponHistoryEntity couponHistory2 = new CouponHistoryEntity();
        BeanUtils.copyProperties(couponHistory,couponHistory2);
        couponHistory2.setId(1012L);
		couponHistoryService.save(couponHistory2);

        CouponHistoryEntity couponHistory3 = new CouponHistoryEntity();
        BeanUtils.copyProperties(couponHistory,couponHistory3);
        couponHistory3.setId(1013L);
        couponHistoryService.save(couponHistory3);

        return R.ok();
    }

2.AT模式

1.AT模式原理

 2.AT模式的脏写问题

全局锁的机制

 

3. 代码实现

上面一开始就导入了 lock_table  到 seata服务器所在的数据库

然后只需要导入 undo_log 表就可以了 每个用到分布式事务的微服务所在的数据库都必须要导入一张undo_log表

CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
seata:
  data-source-proxy-mode: AT 

重启一下就完事了,无代码侵入

测试回滚成功,和上面XA一样的代码不用动

3.TCC模式

1.TCC模式原理

 

 

2.TCC代码实现

 

 1.先建一张 冻结表
 CREATE table `pms_spu_info_tbl`( 
  `xid` varchar(128) NOT NULL,
  `user_id` varchar(255) Default null comment '用户ID',
  `freeze_money` int unsigned default '0' comment '冻结金额',
  `state` int default null comment '事务状态,0:try,1:confirm,2:cancel',
  Primary key (`xid`) USING BTREE
) ENGINE = InnoDB Default charset=utf8;
2.try业务

记录冻结金额和事务状态到 pms_spu_info_tbl

添加金额

3.confirm业务

根据xid 删除account_freeze表的冻结记录

4.cancel业务

减少金额 

根据xid 修改account_freeze回滚状态

state 2

5.如何判断是否空回滚
6.如何避免业务悬挂

3.声明TCC接口

 

package com.jmj.gulimall.product.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface AccountTCCService {

    @TwoPhaseBusinessAction(name="induct",commitMethod = "commit",rollbackMethod = "rollback")
    void induct(@BusinessActionContextParameter(paramName = "params") String params,
                @BusinessActionContextParameter(paramName = "money") int money);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}
package com.jmj.gulimall.product.tcc;

import com.jmj.gulimall.product.dao.PmsSpuInfoTblMapper;
import com.jmj.gulimall.product.entity.SpuInfoEntity;
import com.jmj.gulimall.product.service.SpuInfoService;
import com.jmj.gulimall.product.vo.spu.PmsSpuInfoTbl;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
public class AccountTCCServiceImpl implements AccountTCCService {

    @Autowired
    private SpuInfoService spuInfoService;

    @Autowired
    private PmsSpuInfoTblMapper pmsSpuInfoTblMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void induct(String params, int money) {
        //0.获取全局事务ID
        String xid = RootContext.getXID();
        PmsSpuInfoTbl pmsSpuInfoTbl = pmsSpuInfoTblMapper.selectById(xid);
        if (pmsSpuInfoTbl != null) {
            return ;
        }


        //1.扣减可用余额
        SpuInfoEntity byId = spuInfoService.getById(9L);
        byId.setPublishStatus(byId.getPublishStatus() + money);
        spuInfoService.updateById(byId);

        //2.记录冻结金额 事务状态

        PmsSpuInfoTbl p = new PmsSpuInfoTbl();
        p.setUserId("123");
        p.setFreezeMoney(money);
        p.setState(0);
        p.setXid(xid);
        pmsSpuInfoTblMapper.insert(p);

    }

    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        //0.获取全局事务ID
        String xid = businessActionContext.getXid();

        //1.删除冻结金额
        return pmsSpuInfoTblMapper.deleteById(xid) == 1;
    }

    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        //0.获取全局事务ID
        String xid = businessActionContext.getXid();
        PmsSpuInfoTbl pmsSpuInfoTbl = pmsSpuInfoTblMapper.selectById(xid);
        if (pmsSpuInfoTbl != null) {
            if (pmsSpuInfoTbl.getState() ==2) {
                //幂等操作
                return true;
            }

            Integer freezeMoney = pmsSpuInfoTbl.getFreezeMoney();
            //1.扣减可用余额
            SpuInfoEntity byId = spuInfoService.getById(9L);
            byId.setPublishStatus(byId.getPublishStatus() - freezeMoney);
            spuInfoService.updateById(byId);

            pmsSpuInfoTbl.setState(2);
            pmsSpuInfoTblMapper.updateById(pmsSpuInfoTbl);
        }else {
            PmsSpuInfoTbl pmsSpuInfoTbl1 = new PmsSpuInfoTbl();
            String params = businessActionContext.getActionContext("params").toString();
            pmsSpuInfoTbl1.setUserId(params);
            pmsSpuInfoTbl1.setState(2);
            pmsSpuInfoTbl1.setFreezeMoney(0);
            pmsSpuInfoTbl1.setXid(xid);
            pmsSpuInfoTblMapper.insert(pmsSpuInfoTbl1);
        }
        return true;
    }
}
 @GetMapping("/testTcc")
    public R testTcc() {
        accountTCCService.induct("JMJ",12);
        return R.ok();
    }
package com.jmj.gulimall.coupon.controller;


import com.jmj.common.utils.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;


@FeignClient(value = "gulimall-product",path = "/product/spuinfo")
public interface PmsFeignService {

    @GetMapping("/testTcc")
    public R testTcc();
}
  @GetMapping("/tet/tcc")
    @GlobalTransactional(rollbackFor = Exception.class)
    public R tcc(){
        CouponEntity couponEntity = new CouponEntity();
        couponEntity.setCouponImg(UUID.randomUUID().toString());
        couponEntity.setCouponName(UUID.randomUUID().toString());
        couponService.save(couponEntity);

        pmsFeignService.testTcc();


        if (true){
            throw new RuntimeException("测试异常");
        }
        return R.ok();
    }

最终回滚

 能和AT模式混用

4.SAGA模式

类似于工作流 

 无隔离性

 5.SEATA高可用

1.实现微服务高可用和异地容灾

 本地模拟复制一份 文件夹 

把配置文件改成杭州集群

启动命令

seata-server.bat -p 8092

这样集群部署成功了

 若出现容灾的情况下

利用nacos的配置热更新 就可以了

可以新建一个通用客户端配置

client.yaml

官网有详细配置

这里用我自己的简略版

#事务组的映射关系
service.vgroupMapping.seata-demo=default

service.enableDegrade=false
service.disableGlobalTransaction=false

#与TC服务的通信配置

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#rm配置
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
#tm配置
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
#undo配置
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k

自动刷新 注解这里记录一下


/**
 * 优惠券信息
 *
 * @author jiangmingji
 * @email 123456789@qq.com
 * @date 2024-03-21 21:24:36
 */
@RefreshScope //动态从配置中心获取配置
@RestController
@RequestMapping("/coupon/coupon")
public class CouponController {
    @Autowired
    private CouponService couponService;


    @Value("${coupon.user.name}")
    private String name;
    @Value("${coupon.user.age}")
    private Integer age;

微服务读取配置

 这样给微服务 配上配置就可以了

seata:
  data-source-proxy-mode: AT #开启数据源代理的XA模式
  registry: #TC服务注册中心的配置,微服务根据这些信息取注册中心获取tc服务地址
    #参考tc服务自己的 registry.conf 中的配置,
    #包括:地址 namespace, group,application-name, cluster
    type: nacos
    nacos: #TC
      server-addr: 127.0.0.1:8848
      group: DEFAULT_GROUP
      namespace: "" #因为都在public 所以用空串
      application: seata-tc-server
      username: "nacos"
      password: "nacos"
  tx-service-group: seata-demo #事务组,根据这个获取tc服务的cluster名称 自己取的名称
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group: 'SEATA_GROUP'
      dataId: 'client.properties'
      username: 'nacos'
      password: 'nacos'
      namespace: ""

重启服务 

修改配置

将集群切换为杭州 HZ 

 

成功注册到8092那个seata-server上面了 


http://www.kler.cn/a/499837.html

相关文章:

  • 二进制编码 和 Base64编码
  • 解锁 C# 与 LiteDB 嵌入式 NoSQL 数据库
  • JavaScript的输出
  • 浅谈容灾技术方案详解
  • 2025年实训总结
  • 大语言模型的前沿探索:从理论到实践的深度剖析
  • 深入理解计算机系统——优化程序性能(一)
  • 类加载器和双亲委派
  • 深度学习与机器学习的关系和差别?
  • CMD批处理命令入门(4)——ping,ipconfig,arp,start,shutdown,taskkill
  • 【Unity3D】利用IJob、Burst优化处理切割物体
  • Redis 多路复用(Multiplexing)
  • git相关操作笔记
  • LLM的实验平台有哪些:快速搭建测试大语言模型
  • 【Unity-和WPF结合的优势】
  • Pixel 6a手机提示无法连接移动网络,打电话失败!
  • 太原理工大学软件设计与体系结构 --javaEE
  • 算法 -归并排序
  • Linux:操作系统简介
  • Taro+Vue实现图片裁剪组件
  • pytest+allure 入门