结合seata和2PC,简单聊聊seata源码
当前代码分析基于seata1.6.1
整体描述
整体代码流程可以描述为
- TM开启全局事务,会调用TC来获取XID。
- TC在接收到通知后,会生成XID,然后会将当前全局事务保存到global_table表中,并且返回XID。
- 在获取到XID后,会执行业务逻辑。
- 执行业务逻辑的时候,如果发生了增删改,则会对增删改语句做增强。
- 获取前置镜像数据---执行sql,不提交事务--获取后置镜像---准备undoLog---作为RM向TC提交事务分支---生成undo_log日志---提交本地事务,注意,在这里,本地事务已经提交了。只是有undo_log可用于回滚。
- TC接收RM端提交的分支事务,存储到brand_table中。
- 当全局分支事务都执行完成,TM会向TC提起全局事务提交的请求。
- TC接收到请求后,删除全局事务和分支事务(global_table 和 brand_table)。
- TC 通知RM,删除 undo log 日志。
源码解析
系统启动初始化
主要完成两个事情:初始化 TM和RM客户端;创建方法拦截器
在客户端中,核心配置类是SeataAutoConfiguration,在这个类中初始化了一个核心的扫描器GlobalTransactionScanner。
GlobalTransactionScanner 全局事务扫描器,实现了InitializingBean接口,如果继承了该接口,spring会在完成DI之后,调用afterPropertiesSet方法,在该方法中完成了对TM客户端和RM客户端的创建,代码如下
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false, true)) {
//创建客户端的方法
initClient();
}
}
private void initClient() {
...其他校验
//创建TM客户端并且初始化
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
...
//创建RM客户端并且初始化
RMClient.init(applicationId, txServiceGroup);
...
}
同时,GlobalTransactionScanner 继承了AbstractAutoProxyCreator 抽象类,在类完成初始化之后,会调用父类的 postProcessAfterInitialization方法,在父类的方法中,会调用该类重写的一个wrapIfNecessary方法。
wrapIfNecessary 方法会生成一个 GlobalTransactionalInterceptor 全局事务拦截器。
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
...
} else {
...
//生成一个全局事务处理的拦截器,
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
...
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
TM端开启全局事务
开启全局事务
GlobalTransactionalInterceptor中的逻辑
忽略中间调用过程,最终会走到io.seata.tm.api.TransactionalTemplate#execute
在该类中,核心代码如下
public Object execute(TransactionalExecutor business) throws Throwable {
...
try {
...
try {
//开启全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
//进入业务代码,执行业务逻辑
rs = business.execute();
} catch (Throwable ex) {
//当出现业务逻辑异常进行回滚
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
//所有分支事务无异常,提交全局事务
commitTransaction(tx, txInfo);
return rs;
} finally {
...
}
} finally {
...
}
}
开启全局事务的方法就在beginTransaction中,继续往下会去TC中获取一个XID,就是全局事务id
TC端接收全局事务请求后
记录全局事务
在server层的代码中,全局事务的入口方法为 io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
在该方法中会去调用core.begin方法,进入xid获取流程
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//开始获取xid
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
忽略其他流程,关注核心,其调用往下的链路为
core.begin--session.begin--lifecycleListener.onBegin--找到子类方法--this.addGlobalSession--找子类方法
在这里,可以找到三个实现类,分别是代表了数据库、文件和redis实现,文件是默认实现,其他几种都需要进行配置,我们针对数据库实现进行描述。
方法路径为:io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession
然后调用方法transactionStoreManager.writeSession,transactionStoreManager是一个接口,同样有三种实现
在writeSession方法中,会插入全局事务表数据,代码如下
/**
* 插入全局事务表
* 表为:global_table
* @param globalTransactionDO the global transaction do
* @return
*/
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
//插入xid
ps.setString(index++, globalTransactionDO.getXid());
//插入事务id
ps.setLong(index++, globalTransactionDO.getTransactionId());
//插入事务状态,begin = 1
ps.setInt(index++, globalTransactionDO.getStatus());
//插入应用id,一般是服务名
ps.setString(index++, globalTransactionDO.getApplicationId());
//插入事务组
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
//插入事务名称
ps.setString(index++, transactionName);
//插入超时时间
ps.setInt(index++, globalTransactionDO.getTimeout());
//插入事务开始时间
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
RM执行业务代码,并且提交事务
代理数据源,生成undo log,并且通知TC
在seata中,需要配置数据源代理,这个代理会在执行增删改查的时候,对操作进行增强
这里核心需要关注的方法是io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//一般在初始状态下,这个autoCommit是true
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
然后会调用 executeAutoCommitTrue 方法,该方法主要做了几个事情,分别是:获取前置镜像和后置镜像,并且制作undo_log;执行目标sql和插入undo_log;作为RM和TC进行交互,提交分支事务;以及提交事务。代码如下
executeAutoCommitTrue 方法代码
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
//设置提交方式为手动提交
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
//执行sql,并且准备前置镜像和后置镜像
T result = executeAutoCommitFalse(args);
//提交本地事务(内部会和RM进行交互)
connectionProxy.commit();
return result;
});
} catch (Exception e) {
...异常处理
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
executeAutoCommitFalse方法代码
protected T executeAutoCommitFalse(Object[] args) throws Exception {
//获取前置镜像
TableRecords beforeImage = beforeImage();
//执行目标sql,注意,这边执行完后,事务是未提交的
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//获取后置镜像
TableRecords afterImage = afterImage(beforeImage);
//准备undo_log
prepareUndoLog(beforeImage, afterImage);
return result;
}
processGlobalTransactionCommit 方法,该方法就是connectionProxy.commit()最终指向的方法
private void processGlobalTransactionCommit() throws SQLException {
try {
//作为RM,向TC发起请求,注册分支事务,会插入数据到TC的mysql表中
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//生成undo_log日志,用于事务回滚
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//提交undo_log 回滚日志和本地事务,事务在这里已经提交了
targetConnection.commit();
} catch (Throwable ex) {
...异常处理
}
...其他处理
}
TM提交全局事务
进行提交就是向TC发起请求,相关代码如下
@Override
public void commit() throws TransactionException {
//判断当前角色,只有TM才能执行
if (role == GlobalTransactionRole.Participant) {
...
return;
}
//XID不能为空
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
//可重试的执行,最多可执行5次
while (retry > 0) {
try {
retry--;
//向tc发起调用
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
...
}
}
} finally {
...
}
...
}
TC处理全局事务
TC在接收到提交请求后,会由方法 io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit 进行处理。
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
//设置状态为异步提交状态
response.setGlobalStatus(core.commit(request.getXid()));
}
在AT模式下,事务的提交为异步的方式
public GlobalStatus commit(String xid) throws TransactionException {
//获取全局session,不同模式获取方式不同,如果是db,则会从数据库获取
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
//如果获取不到session,返回已完成状态,一般在调用超时的时候会发生,这样也可以保证幂等
return GlobalStatus.Finished;
}
...
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
if (globalSession.getStatus() == GlobalStatus.Begin) {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
//判断是否可以异步提交,AT模式下可以异步提交
if (globalSession.canBeCommittedAsync()) {
//AT模式下异步事务提交
globalSession.asyncCommit();
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
return false;
} else {
...
}
}
return false;
});
...
}
最终将事务状态设置为异步提交
public void asyncCommit() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
//设置事务状态为异步提交,这里在设置为异步提交后就不管了
this.setStatus(GlobalStatus.AsyncCommitting);
SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
}
TC异步执行全局事务commit
核心逻辑为 io.seata.server.coordinator.DefaultCoordinator#init
该方法会异步的去进行处理,每秒执行一次
/**
* Init.
*/
public void init() {
...
//异步处理的部分
//会从global中,每次取100条进行处理,并且删除这100条数据,然后遍历brand_table,根据global_table取删除
//操作完了之后,向RM进行通知,进行undo_log的删除
asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
...
}
未完待续...
以下是经过注释的源码地址:seata: Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务 - Gitee.com