Java分布式事务(九)
文章目录
- 🔥XA强一致性分布式事务实战_Atomikos介绍
- 🔥XA强一致性分布式事务实战_业务说明
- 🔥XA强一致性分布式事务实战_项目搭建
- 🔥XA强一致性分布式事务实战_多数据源实现
- 🔥XA强一致性分布式事务实战_业务层实现
🔥XA强一致性分布式事务实战_Atomikos介绍
简单介绍
Atomikos(https://www.atomikos.com/),其实是一家公司的名字,提供了基于 JTA规范的XA分布式事务TM的实现 。其旗下最著名的产品就是事务管理器。
产品分两个版本:
⭐TransactionEssentials:开源的免费产品;
⭐ExtremeTransactions:上商业版,需要收费。
这两个产品的关系如下图所示:
ExtremeTransactions 在 TransactionEssentials 的基础上额外提供了以下功能:
⭐支持 TCC:这是一种柔性事务
⭐支持通过 RMI、IIOP、SOAP 这些远程过程调用技术,进行事务传播。
⭐事务日志云存储,云端对事务进行恢复,并且提供了完善的管理后台。
什么是JTA
Java事务API(JTA:Java Transaction API)和它的同胞Java事务服务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)的能力。
⭕
要想使用用 JTA 事务,那么就需要有一个实现
javax.sql.XADataSource 、 javax.sql.XAConnection 和javax.sql.XAResource 接口的 JDBC 驱动程序。
一个实现了这些接口的驱动程序将可以参与 JTA 事务。
一个 XADataSource 对象就是一个 XAConnection 对象的工厂。
XAConnection 是参与JTA 事务的 JDBC 连接。
🔥XA强一致性分布式事务实战_业务说明
场景介绍
本案例使用Atomikos框架实现XA强一致性分布式事务,模拟跨库转账的业务场景。不同账户之间的转账操作通过同一个项目程序完成。
说明:
转账服务不会直接连接数据库进行转账操作,而是通过Atomikos框架对数据库连接进行封装,通过Atomikos框架操作不同的数据库。由于Atomikos框架内部实现了XA分布式事务协议,因此转账服务的逻辑处理不用关心分布式事务是如何实现的,只需要关注具体的业务逻辑。
框架选型
框架名字 | 版本 |
---|---|
MySQL | 5.7 |
JDK | 1.8 |
微服务框架 | SpringBoot 2.6.3 |
分布式事务框架 | Atomikos |
持久层框架 | Mybatis plus |
user_account数据表结构
设计完数据表后,在192.168.66.100服务器创建2个数据库,分别为tx-xa-01和tx-xa-02,分别在2个数据库中创建转出金额数据库。
DROP TABLE IF EXISTS user_account;
CREATE TABLE user_account (
account_no varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户编号'
,
account_name varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户名称',
account_balance decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
PRIMARY KEY (account_no) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = Dynamic;
添加数据
tx-xa-01库中添加数据。
INSERT INTO `user_account` VALUES ('1001',
'张三', 10000.00);
tx-xa-02库中添加数据。
INSERT INTO `user_account` VALUES ('1002',
'李四', 10000.00);
🔥XA强一致性分布式事务实战_项目搭建
创建atomikos-xa项目
创建依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starterjta-atomikos</artifactId>
</dependency>
<!-- druid连接池依赖组件-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-bootstarter</artifactId>
<version>1.1.22</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-bootstarter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
编写配置文件
server:
port: 6003
spring:
autoconfigure:
#停用druid连接池的自动配置
exclude:
com.alibaba.druid.spring.boot.autoconfigure.Dru
idDataSourceAutoConfigure
datasource:
#选用druid的XADataSource数据源,因为这个数据源支
持分布式事务管理
type:
com.alibaba.druid.pool.xa.DruidXADataSource
#以下是自定义字段
dynamic:
primary: master
datasource:
master:
url:
jdbc:mysql://192.168.66.102:3306/tx-xa-01?
useUnicode=true&characterEncoding=utf-
8&allowMultiQueries=true&useSSL=false&zeroDateT
imeBehavior=convertToNull&serverTimezone=Asia/S
hanghai&autoReconnect=true
username: root
password: 123456
driver-class-name:
com.mysql.jdbc.Driver
slave:
url:
jdbc:mysql://192.168.66.102:3306/tx-xa-02?
useUnicode=true&characterEncoding=utf-
8&allowMultiQueries=true&useSSL=false&zeroDateT
imeBehavior=convertToNull&serverTimezone=Asia/S
hanghai&autoReconnect=true
username: root
password: 123456
driver-class-name:
com.mysql.jdbc.Driver
validation-query: SELCET 1
编写主启动类
@Slf4j
@SpringBootApplication
@EnableTransactionManagement(proxyTargetClass = true)
public class TxXaStarter {
public static void main(String[] args){
SpringApplication.run(TxXaStarter.class,args);
log.info("*************** TxXaStarter
*********");
}
}
🔥XA强一致性分布式事务实战_多数据源实现
创建第一个数据源的配置类DBConfig1
@Data
@ConfigurationProperties(prefix =
"spring.datasource.dynamic.datasource.master")
public class DBConfig1 {
private String url;
private String username;
private String password;
private String dataSourceClassName;
}
创建第二个数据源的配置类DBConfig2
@Data
@ConfigurationProperties(prefix =
"spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {
private String url;
private String username;
private String password;
private String dataSourceClassName;
}
创建持久层接口
分别在com.it.mapper1包和com.it.mapper2包下
创建UserAccount1Mapper接口和UserAccount2Mapper接口。
public interface UserAccount1Mapper extends
BaseMapper<UserAccount> {
}
public interface UserAccount2Mapper extends
BaseMapper<UserAccount> {
}
创建MyBatisConfig1类
MyBatisConfig1类的作用是整合Atomikos框架,读取DBConfig1类中的信息,实现数据库连接池,最终通过Atomikos框架的数据库连接池连接数据库并操作。
@Configuration
@MapperScan(basePackages =
"com.it.mapper1", sqlSessionTemplateRef
= "masterSqlSessionTemplate")
public class MyBatisConfig1 {
@Bean(name = "masterDataSource")
public DataSource
masterDataSource(DBConfig1 dbConfig1) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setUniqueResourceName("masterDataSource");
sourceBean.setXaDataSourceClassName(dbConfig1.getDataSourceClassName());
sourceBean.setTestQuery("select 1");
sourceBean.setBorrowConnectionTimeout(3);
MysqlXADataSource dataSource = new
MysqlXADataSource();
dataSource.setUser(dbConfig1.getUsername());
dataSource.setPassword(dbConfig1.getPassword()
);
dataSource.setUrl(dbConfig1.getUrl());
sourceBean.setXaDataSource(dataSource);
return sourceBean;
}
@Bean(name = "masterSqlSessionFactory")
public SqlSessionFactory
masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception
{
MybatisSqlSessionFactoryBean
sqlSessionFactoryBean = new
MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource
);
return
sqlSessionFactoryBean.getObject();
}
@Bean(name = "masterSqlSessionTemplate")
public SqlSessionTemplate
masterSqlSessionTemplate(@Qualifier("masterSqlS
essionFactory") SqlSessionFactory
sqlSessionFactory){
return new
SqlSessionTemplate(sqlSessionFactory);
}
}
创建MyBatisConfig2类
MyBatisConfig2类的作用与MyBatisConfig1类的作用相似,只不过MyBatisConfig2类读取的是DBConfig2类中的信息,封装的是整合了Atomikos框架的另一个数据源的数据库连接池,通过连接池连接数据库并操作。
@Configuration
@MapperScan(basePackages =
"com.it.mapper2", sqlSessionTemplateRef
= "slaveSqlSessionTemplate")
public class MyBatisConfig2 {
@Bean(name = "slaveDataSource")
public DataSource slaveDataSource(DBConfig2 dbConfig2) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setUniqueResourceName("slaveDataSource");
sourceBean.setXaDataSourceClassName(dbConfig2.
getDataSourceClassName());
sourceBean.setTestQuery("select 1");
sourceBean.setBorrowConnectionTimeout(3);
MysqlXADataSource dataSource = new
MysqlXADataSource();
dataSource.setUser(dbConfig2.getUsername());
dataSource.setPassword(dbConfig2.getPassword()
);
dataSource.setUrl(dbConfig2.getUrl());
sourceBean.setXaDataSource(dataSource);
return sourceBean;
}
@Bean(name = "slaveSqlSessionFactory")
public SqlSessionFactory
slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean
sqlSessionFactoryBean = new
MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSoure
);
return
sqlSessionFactoryBean.getObject();
}
@Bean(name = "slaveSqlSessionTemplate")
public SqlSessionTemplate
slaveSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory
sqlSessionFactory){
return new
SqlSessionTemplate(sqlSessionFactory);
}
}
🔥XA强一致性分布式事务实战_业务层实现
项目的业务逻辑层主要实现具体的跨库转账的业务逻辑,由于具体的XA跨库分布式事务是由Atomikos框架内部实现的,因此在业务逻辑层处理跨库转账的逻辑时,就像操作本地数据库一样简单。
创建UserAccount类
@Data
@TableName("user_account")
@AllArgsConstructor
@NoArgsConstructor
public class UserAccount implements
Serializable {
private static final long serialVersionUID = 6909533252826367496L;
/**
* 账户编号
*/
@TableId
private String accountNo;
/**
* 账户名称
*/
private String accountName;
/**
* 账户余额
*/
private BigDecimal accountBalance;
}
创建UserAccountService接口
public interface UserAccountService {
/**
* 跨库转账
* @param sourceAccountNo 转出账户
* @param targetSourceNo 转入账户
* @param bigDecimal 金额
*/
void transferAccounts(String sourceAccountNo, String targetSourceNo,
BigDecimal transferAmount);
}
实现UserAccountService接口
package com.it.service.impl;
import com.it.entity.UserAccount;
import com.it.mapper1.UserAccountMapper1;
import com.it.mapper2.UserAccountMapper2;
import com.it.service.IUserAccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import
org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-13
*/
@Service
public class UserAccountServiceImpl implements
IUserAccountService {
@Autowired
private UserAccountMapper1
userAccountMapper1;
@Autowired
private UserAccountMapper2
userAccountMapper2;
/**
* 跨库转账
* @param sourceAccountNo 源账户
* @param targetSourceNo 目标账户
* @param bigDecimal 金额
*/
@Transactional
@Override
public void transofer(String
sourceAccountNo, String targetSourceNo,
BigDecimal bigDecimal) {
// 1. 查询原账户
UserAccount sourceUserAccount =
userAccountMapper1.selectById(sourceAccountNo);
// 2. 查询目标账户
UserAccount targetUserAccount =
userAccountMapper2.selectById(targetSourceNo);
// 3. 判断转入账户和转出账户是否为空
if (sourceAccountNo != null &&
targetUserAccount != null){
// 4. 判断转出账户是否余额不足
if
(sourceUserAccount.getAccountBalance().compare To(bigDecimal) < 0){
throw new RuntimeException("余额不足");
}
// 5.更新金额
sourceUserAccount.setAccountBalance(sourceUserAccount.getAccountBalance().subtract(bigDecimal));
// 6.张三账户减金额
userAccountMapper1.updateById(sourceUserAccount);
System.out.println(10/0);
// 7.更新金额
targetUserAccount.setAccountBalance(targetUserAccount.getAccountBalance().add(bigDecimal));
// 8.张三账户减金额
userAccountMapper2.updateById(targetUserAccount);
}
}
}