Redis篇--常见问题篇7--缓存一致性2(分布式事务框架Seata)
1、概述
在传统的单体应用中,事务管理相对简单,通常使用数据库的本地事务(如MySQL的BEGIN和COMMIT)来保证数据的一致性。然而,在微服务架构中,由于每个服务都有自己的数据库,跨服务的事务管理变得复杂。Seata正是为了解决这一问题而设计的,它提供了一种轻量级的分布式事务解决方案,能够在多个微服务之间协调事务,确保数据的一致性。
Seata是阿里巴巴开源的一款分布式事务解决方案,旨在解决微服务架构中的分布式事务问题。它提供了ACID(原子性、一致性、隔离性、持久性)事务的分布式实现,确保在多个微服务之间进行跨服务调用时,事务能够保持一致性和可靠性。
2、Seata的4种工作模式
AT(Automatic Transaction Mode)模式:
AT模式是Seata 默认的分布式事务模式,它基于SQL解析和拦截器机制,自动处理全局事务,无需我们编码,适用于大多数关系型数据库(如MySQL、Oracle、PostgreSQL等)。在AT模式下,Seata会自动记录数据库操作的前后镜像(before image和after image),并在事务提交或回滚时进行数据的一致性检查。开发者不需要编写额外的补偿逻辑,适合大多数场景。也是Seata最常用的一种模式。
TCC(Try-Confirm-Cancel Mode)模式:
TCC模式一般用于适合对性能要求较高的场景。是一种显式的两阶段提交协议,开发者需要为每个业务操作定义三个方法:
- try:尝试执行业务操作,预留资源。
- confirm:确认操作,正式提交资源。
- cancel:取消操作,回滚资源。
TCC模式适用于复杂的业务场景,尤其是跨多个服务的分布式事务。开发者需要手动编写补偿逻辑,确保事务的一致性。
SAGA(Saga Orchestration Mode)模式:
SAGA模式是一种长事务模式,是基于状态机驱动的分布式事务。适用于涉及多个服务需要长时间运行的业务流程。SAGA模式将整个业务流程拆分为多个子事务,每个子事务都有对应的补偿操作。如果某个子事务失败,SAGA会依次执行前面子事务的补偿操作,确保事务的一致性。
SAGA 模式有两种实现方式:
- Orchestration(编排模式):由一个中央控制器(Orchestrator)负责协调各个子事务的执行。
- Choreography(编舞模式):各个子事务通过事件驱动的方式协同工作,没有中央控制器。
XA(X/Open XA Mode)模式:
XA模式是传统的两阶段提交协议,兼容XA规范,广泛应用于传统的关系型数据库系统。Seata支持XA模式,允许开发者使用XA协议来管理分布式事务。XA模式依赖于数据库的XA支持,因此需要数据库和JDBC驱动的支持。适合对一致性要求极高的场景,但性能较低。
3、Seata的核心特性
(1)、支持多种分布式事务模式(AT,TCC,SAGA,XA)。
(2)、轻量级:Seata不依赖于任何特定的中间件或框架,可以与Spring Cloud、Dubbo等微服务框架无缝集成。
(3)、高可用性和扩展性:Seata的事务协调器(TC)和事务管理器(TM)都可以水平扩展,支持集群部署,确保系统的高可用性和可扩展性。
(4)、多语言支持:Seata支持多种编程语言,包括Java、Go、C等,方便不同技术栈的团队使用。
(5)、丰富的生态:Seata提供了与主流数据库、消息队列、缓存等组件的集成,能够满足复杂的业务需求。
4、两阶段提交
XA协议的核心是两阶段提交(Two-Phase Commit,2PC)机制:
(1)、第一阶段(准备阶段):
全局事务协调者向所有参与者发送准备请求,每个参与者执行本地事务并返回确认信息给协调者。如果所有参与者都返回成功,那么协调器进入第二阶段。如果任何一个参与者返回失败,协调者将发送回滚请求。
(2)、第二阶段(提交/回滚阶段):
全局事务协调者根据第一阶段的反馈情况,决定提交或回滚全局事务。首先,协调者发送提交或回滚指令给所有参与者,然后参与者按照指令执行相应的操作。
通过两阶段提交,XA协议保证了分布式事务的原子性和一致性。每个参与者在第一阶段确认准备之后,就无法单独执行本地事务的提交或回滚操作,而是依赖于全局事务协调的指令。
简单说:
第一阶段是所有服务执行各自的事务代码并记录了执行结果。
第二阶段是事务协调者根据所有服务第一阶段的结果决定是否提交全局事务。第一阶段中每一个服务都记录了自身的undo_log日志,在第二阶段中,事务协调者如果发现有一个服务失败了,就会通知所有的服务执行undo_log日志,实现全局回滚。如果全部服务都成功了,那么就直接提交全局事务,结束本次事务处理。不论第二阶段是成功还是失败,TC(事务协调器)都会清理本次全局事务的状态,并释放资源。
5、Seata的架构及原理
(1)、架构
Seata的工作原理基于“三者分离”的架构设计,分别是:
事务协调器(Transaction Coordinator, TC):
- 负责维护全局事务的状态,协调各个分支事务的提交或回滚。
- 作为独立的服务部署,通常是一个集群,确保高可用性和容错能力。
事务管理器(Transaction Manager, TM):
- 负责开启全局事务,并向TC注册全局事务。
- 当所有分支事务完成后,TM会根据业务逻辑决定是提交还是回滚全局事务。
资源管理器(Resource Manager, RM):
- 负责管理分支事务,通常是微服务中的数据库连接。
- RM会将本地事务注册到TC,并在全局事务提交或回滚时执行相应的操作。
架构示例如下:
简单理解下:
TC就相当于我们部署的Seata服务,用于管理所有的全局事务信息。TM就相当于我们用注解标注的方法,RM内就相当于注解方法内使用的每一个微服务的任务。
当执行到标识全局事务的方法时,TM就会往TC注册全局事务的信息并开启全局事务,同时会通知TC本次全局任务有哪些RM的任务。
之后程序会运行方法中的代码,执行每一个RM任务并记录执行结果。TC会监听这些子任务的结果,如果全部RM都运行成功,全局任务就会通过。如果有一个失败,就会通知所有的RM去回滚服务。
(2)、以AT模式为例,Seata的工作流程如下
第一步:开启全局事务
- TM向TC发起请求,开启一个新的全局事务,并获取全局事务ID(XID)。
第二步:执行分支事务
- 在每个微服务中,RM会拦截SQL语句,解析并生成对应的undo_log,用于记录事务的回滚信息。
- RM将本地事务注册到TC,并将XID传递给下游服务。
第三步:执行全局事务(提交或回滚)
- 当所有分支事务执行完毕后,TM根据业务逻辑决定是提交还是回滚全局事务。
- 如果提交,TC会通知所有RM提交本地事务;如果回滚,TC会通知所有RM使用undo_log回滚本地事务。
第四步:清理资源
- 全局事务提交或回滚后,TC会清理相关的事务状态,释放资源。
6、Seata的优势
- 无侵入性:在AT模式下,Seata可以通过AOP(面向切面编程)自动拦截SQL语句,生成undo_log,而不需要修改业务代码。这使得开发者可以专注于业务逻辑,而不必关心事务的实现细节。
- 高性能:Seata的AT模式基于SQL解析和两阶段提交,避免了传统XA协议的性能瓶颈。它只在必要的时候生成undo_log,减少了对数据库的压力。
- 强一致性:Seata通过两阶段提交协议,确保了分布式事务的强一致性。即使某个分支事务失败,整个全局事务也会回滚,保证数据的一致性。
- 灵活的事务模式:Seata提供了多种事务模式(AT、TCC、SAGA、XA),可以根据不同的业务场景选择最适合的模式。例如,对于简单的CRUD操作,可以使用AT模式;对于复杂的业务流程,可以使用TCC或SAGA模式。
7、Seata的应用场景
- 微服务之间的数据一致性:当多个微服务需要协同完成一个业务操作时,Seata可以确保这些服务之间的数据一致性。例如,订单系统和库存系统之间的扣减库存操作。
- 跨库事务:当同一个微服务需要操作多个数据库时,Seata可以确保这些数据库之间的事务一致性。例如,一个微服务需要操作更新用户库和订单库。
- 异步事务:Seata的SAGA模式支持长事务,适用于需要长时间运行的业务流程。例如,复杂的审批流程或订单处理流程。
- 跨服务的消息队列事务:Seata可以与消息队列(如RocketMQ、Kafka)结合使用,确保消息的发送和接收在一个全局事务中。例如,订单创建成功后,发送一条消息通知库存系统扣减库存。
8、Seata使用示例
(1)、安装,配置及使用
主要包括以下几个步骤:
第一步:下载
- 你可以从Seata官方GitHub下载最新的Seata Server。
目录如:
第二步:配置Seata Server- 编辑conf/file.conf文件,配置Seata Server的存储方式(如文件存储、数据库存储等)。
- 编辑registry.conf文件,配置Seata的注册中心(如Nacos、Eureka、Zookeeper等)。
第三步:启动服务
在bin目录下有启动脚本,解压后启动seata-server.sh或seata-server.bat。
第四步:配置客户端:
- 在微服务中引入Seata的依赖项(如seata-spring-boot-starter),并在application.yml中配置Seata的相关参数,如事务模式、Seata Server地址等。
第五步:编写业务代码:
- 在业务代码中使用@GlobalTransactional注解来开启全局事务。Seata会自动拦截SQL语句,生成undo_log,并协调各个分支事务的提交或回滚。
(2)、四种模式代码示例
AT模式(Automatic Transaction Mode)
概述:
AT模式是Seata默认的分布式事务模式,它基于SQL解析和拦截器机制,自动处理全局事务。在AT模式下,Seata会自动记录数据库操作的前后镜像(before image和after image),并在事务提交或回滚时进行数据的一致性检查。开发者不需要编写额外的补偿逻辑,适合大多数场景。
适用场景:
- 适用于标准的SQL操作,如INSERT、UPDATE、DELETE。
- 不需要手动编写补偿逻辑,适合对事务一致性要求较高的场景。
代码示例
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountService accountService;
// 创建订单并扣减账户余额
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrder(Long userId, Double amount) {
try {
// 1. 扣减账户余额
accountService.decreaseBalance(userId, amount);
// 2. 创建订单
Order order = new Order();
order.setUserId(userId);
order.setAmount(amount);
order.setStatus("CREATED");
orderRepository.save(order);
// 3. 模拟异常(用于测试回滚)
// if (true) throw new RuntimeException("Simulated error");
System.out.println("Order created successfully.");
} catch (Exception e) {
System.err.println("Error creating order: " + e.getMessage());
throw e; // 抛出异常以触发事务回滚
}
}
}
解释:
- @GlobalTransactional:标记该方法为全局事务,确保所有参与的方法(如:扣减账户余额和创建订单)都在同一个事务中执行。
- 自动处理:Seata会自动记录SQL操作的前后镜像,并在事务提交或回滚时进行数据的一致性检查。
- 无需补偿逻辑:开发者不需要编写额外的补偿逻辑,Seata会自动处理事务的提交和回滚。
TCC模式(Try-Confirm-Cancel Mode)
概述
TCC模式是一种显式的两阶段提交协议,开发者需要为每个业务操作定义三个方法:
- try:尝试执行业务操作,预留资源。
- confirm:确认操作,正式提交资源。
- cancel:取消操作,回滚资源。
TCC模式适用于复杂的业务场景,尤其是跨多个服务的分布式事务。开发者需要手动编写补偿逻辑,确保事务的一致性。
适用场景
- 适用于复杂的业务场景,尤其是跨多个服务的分布式事务。
- 需要手动编写补偿逻辑,适合对性能和灵活性有较高要求的场景。
代码示例
import com.alibaba.fastjson.JSONObject;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public class AccountService {
// Try 阶段:尝试扣减账户余额
// 通常在try阶段通过使用二阶段提交注解,指定提交的方法和回滚的方法,如下:
@TwoPhaseBusinessAction(name = "decreaseBalance", commitMethod = "confirmDecreaseBalance", rollbackMethod = "cancelDecreaseBalance")
public boolean tryDecreaseBalance(@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "amount") Double amount) {
// 检查账户余额是否足够
if (checkBalance(userId, amount)) {
// 预留资源(例如,冻结金额)
freezeBalance(userId, amount);
return true;
}
return false;
}
// Confirm 阶段:正式扣减账户余额
public boolean confirmDecreaseBalance(BusinessActionContext actionContext) {
// 从上下文中获取参数
JSONObject params = actionContext.getActionContext();
Long userId = params.getLong("userId");
Double amount = params.getDouble("amount");
// 正式扣减余额
decreaseBalance(userId, amount);
return true;
}
/ / Cancel 阶段:取消扣减账户余额
public boolean cancelDecreaseBalance(BusinessActionContext actionContext) {
// 从上下文中获取参数
JSONObject params = actionContext.getActionContext();
Long userId = params.getLong("userId");
Double amount = params.getDouble("amount");
// 取消预留的资源(例如,解冻金额)
unfreezeBalance(userId, amount);
return true;
}
// 辅助方法
private boolean checkBalance(Long userId, Double amount) {
// 检查账户余额是否足够
return true; // 简化示例
}
private void freezeBalance(Long userId, Double amount) {
// 冻结余额
System.out.println("Freezing balance for user " + userId + " with amount " + amount);
}
private void decreaseBalance(Long userId, Double amount) {
// 扣减余额
System.out.println("Decreasing balance for user " + userId + " with amount " + amount);
}
private void unfreezeBalance(Long userId, Double amount) {
// 解冻余额
System.out.println("Unfreezing balance for user " + userId + " with amount " + amount);
}
}
解释:
- @LocalTCC:标记该类为 TCC 模式的本地事务参与者。
- @TwoPhaseBusinessAction:定义TCC模式的三步操作:try、confirm和cancel。
- 手动编写补偿逻辑:开发者需要为每个业务操作编写try、confirm和cancel方法,确保事务的一致性。
- 资源预留:try阶段只预留资源,不真正修改数据;confirm阶段正式提交资源;cancel阶段回滚资源。
SAGA模式(Saga Orchestration Mode)
概述
SAGA模式是一种长事务模式,适用于涉及多个服务的复杂业务流程。SAGA模式将整个业务流程拆分为多个子事务,每个子事务都有对应的补偿操作。如果某个子事务失败,SAGA会依次执行前面子事务的补偿操作,确保事务的一致性。
SAGA 模式有两种实现方式:
- Orchestration(编排模式):由一个中央控制器(Orchestrator)负责协调各个子事务的执行。
- Choreography(编舞模式):各个子事务通过事件驱动的方式协同工作,没有中央控制器。
适用场景
- 适用于涉及多个服务的复杂业务流程。
- 适合长事务场景,允许部分子事务成功后继续执行后续操作。
- 适合对最终一致性要求较高的场景。
代码示例:(Orchestration模式)
import io.seata.saga.statelang.domain.StateMachineInstance;
import io.seata.saga.statelang.domain.StateMachineStatus;
import io.seata.saga.statelang.service.StateMachineEngine;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SagaOrderService {
@Autowired
private StateMachineEngine stateMachineEngine;
// 创建订单并扣减账户余额
public void createOrder(Long userId, Double amount) {
// 定义SAGA流程的状态机ID
String stateMachineId = "orderStateMachine";
// 创建状态机实例
StateMachineInstance instance = new StateMachineInstance();
instance.setStateMachineId(stateMachineId);
instance.setBizKey("order_" + System.currentTimeMillis());
// 设置输入参数
instance.setInputData(new JSONObject().fluentPut("userId", userId).fluentPut("amount", amount));
// 启动SAGA流程
StateMachineStatus status = stateMachineEngine.start(instance);
if (status == StateMachineStatus.EXECUTED) {
System.out.println("SAGA流程执行成功。");
} else {
System.err.println("SAGA流程执行失败。");
}
}
}
解释
- StateMachineEngine:SAGA模式的状态机引擎,负责协调各个子事务的执行。
- StateMachineInstance:表示SAGA流程的一个实例,包含状态机ID、业务键和输入参数。
- start():启动SAGA流程,执行一系列子事务。
- 补偿机制:如果某个子事务失败,SAGA会自动执行前面子事务的补偿操作,确保事务的一致性。
XA模式(X/Open XA Mode)
概述
XA模式是传统的两阶段提交协议,广泛应用于传统的关系型数据库系统。Seata支持XA模式,允许开发者使用XA协议来管理分布式事务。XA模式依赖于数据库的XA支持,因此需要数据库和JDBC驱动的支持。
适用场景
- 适用于传统的企业级应用,尤其是已经使用XA协议的系统。
- 适合对事务一致性要求极高的场景,但性能相对较低。
代码示例
XA是比较传统的方式,功能实现的话和AT模式相似,通常还是建议使用AT模式,更加简单和方便。
import io.seata.rm.datasource.DataSourceProxy;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@Configuration
@EnableTransactionManagement
public class XAConfig {
@Autowired
private DataSource dataSource;
// 配置XA模式的数据源代理
@Bean
public DataSource xaDataSource() {
return new DataSourceProxy(dataSource);
}
// 配置事务管理器
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(xaDataSource());
}
}
// 使用 XA 模式的业务逻辑
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountService accountService;
// 创建订单并扣减账户余额
@Transactional
public void createOrder(Long userId, Double amount) {
try {
// 1. 扣减账户余额
accountService.decreaseBalance(userId, amount);
// 2. 创建订单
Order order = new Order();
order.setUserId(userId);
order.setAmount(amount);
order.setStatus("CREATED");
orderRepository.save(order);
// 3. 模拟异常(用于测试回滚)
// if (true) throw new RuntimeException("Simulated error");
System.out.println("Order created successfully.");
} catch (Exception e) {
System.err.println("Error creating order: " + e.getMessage());
throw e; // 抛出异常以触发事务回滚
}
}
}
解释
- DataSourceProxy:Seata提供的数据源代理,用于包装原有的数据源,使其支持XA协议。
- @Transactional:标记该方法为事务性方法,确保所有操作都在同一个事务中执行。
- 依赖XA支持:XA模式依赖于数据库和JDBC驱动的支持,因此需要确保使用的数据库和驱动程序支持XA协议。
(3)、四种模式的区别
9、Seata实现缓存一致性
上面四种工作模式的介绍中,其实比较常用的也就是AT模式和TCC模式。
我们在回头看下缓存一致性的问题,想要想实现Redis和Mysql的分布式事务。AT模式显然不合适,AT模式通常是通过undo_log实现回滚,也就是每个任务都是数据库任务才行。
TCC模式刚好能实现,通过TCC模式,可以自定义实现回滚方法,可以保证Redis和mysql的一致性,但方法千万不能写错了啊。
代码示例:
Service调用层类
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private OrderTcc orderTcc;
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrder(Long userId, Long productId, int quantity) {
// 调用 TCC 模式的 Try 接口
orderTcc.tryCreateOrder(userId, productId, quantity);
}
}
实现类
import com.alibaba.ttl.TransmittableThreadLocal;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@LocalTCC // TCC模式
public class OrderTcc {
@Autowired
private OrderRepository orderRepository; // MySQL 订单表的操作
@Autowired
private RedisTemplate<String, Object> redisTemplate; // Redis 操作
// Try 接口:预留资源
@TwoPhaseBusinessAction(name = "create-order")
public boolean tryCreateOrder(@BusinessActionContextParameter("userId") Long userId,
@BusinessActionContextParameter("productId") Long productId,
@BusinessActionContextParameter("quantity") int quantity) {
// 1. 在 MySQL 中插入订单(预留资源)
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus("PENDING"); // 订单状态为待确认
orderRepository.save(order);
// 2. 在 Redis 中设置临时键,表示库存已预留
String tempKey = "temp:stock:" + productId;
redisTemplate.opsForValue().set(tempKey, quantity, 60, TimeUnit.SECONDS); // 设置 60 秒过期时间
return true;
}
// Confirm 接口:确认提交
public boolean commit(BusinessActionContext actionContext) {
// 1. 获取订单 ID
Long orderId = (Long) actionContext.getActionContext("orderId");
// 2. 更新 MySQL 中的订单状态为已完成
orderRepository.updateStatusById(orderId, "COMPLETED");
// 3. 在 Redis 中设置正式的库存缓存
Long productId = (Long) actionContext.getActionContext("productId");
int quantity = (int) actionContext.getActionContext("quantity");
redisTemplate.opsForValue().set("stock:" + productId, quantity);
// 4. 删除临时键
redisTemplate.delete("temp:stock:" + productId);
return true;
}
// Cancel 接口:回滚资源
public boolean rollback(BusinessActionContext actionContext) {
// 1. 获取订单 ID
Long orderId = (Long) actionContext.getActionContext("orderId");
// 2. 回滚 MySQL 中的订单
orderRepository.deleteById(orderId);
// 3. 删除 Redis 中的临时键
Long productId = (Long) actionContext.getActionContext("productId");
redisTemplate.delete("temp:stock:" + productId);
return true;
}
}