Springboot整合RocketMQ分布式事务
RocketMQ分布式事务
- rocketMQ5.0官方文档
- 案例源码地址
- 数据库初始化
- 创建user_order和user_points
- POM依赖
- 配置文件
- 事务消息处理流程
- RocketMQLocalTransactionListener源码
- 整体业务逻辑如下
- 代码如下
- Producer 发送事务消息
- MQ Server回应消息发送成功
- 消息投递
- 事务回查
- MQ Server回应消息发送成功和消息回查都是通过实现RocketMQLocalTransactionListener接口进行实现完整代码就是OrderListener
- 消费者
rocketMQ5.0官方文档
https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/
案例源码地址
源码地址 https://gitee.com/Lin-seven/rocket-shiwu
数据库初始化
创建user_order和user_points
CREATE TABLE `user_order` (
`order_id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_code` varchar(50) DEFAULT NULL COMMENT '用户ID',
`order_points` bigint DEFAULT NULL COMMENT '订单积分',
PRIMARY KEY (`order_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb3;
CREATE TABLE `user_points` (
`user_id` bigint NOT NULL AUTO_INCREMENT COMMENT 'user_id',
`order_id` bigint DEFAULT NULL COMMENT '订单id',
`points` bigint DEFAULT NULL COMMENT '积分',
PRIMARY KEY (`user_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
POM依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.7</version>
</dependency>
<!--Mysql数据库驱动8 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
配置文件
spring:
application:
name: rocketmq_demo
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/rocketmq_demo?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
username: root
password: 123456
# ========================mybatis===================
mybatis-plus:
configuration:
# 指定 MyBatis 使用 SLF4J 日志实现
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
rocketmq:
# NameServer
name-server: 127.0.0.1:9876
producer:
# 发送消息超时时间,默认3000
send-message-timeout: 30000
# 生产者组
group: groupTest
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
事务消息处理流程
Producer 即MQ发送方,本例中是用户服务,负责新增订单。MQ订阅方即消息消费方,本例中是积分服务,负责
新增积分。
1、Producer 发送事务消息
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注
意此时这条消息消费者(MQ订阅方)是无法消费到的。 本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
2、MQ Server回应消息发送成功
MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
3、Producer 执行本地事务
Producer 端执行业务代码逻辑,通过本地数据库事务控制。 本例中,Producer 执行添加用户操作。
4、消息投递
若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积
分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息; 若Producer
本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删 除”增加积分消息“
。 MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即
程序执行正常则自动回应ack。
5、事务回查
如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer
来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此 只需关注本地事务的执行状态即可。
RocketMQLocalTransactionListener源码
public interface RocketMQLocalTransactionListener {
/**
‐ 发送prepare消息成功此方法被回调,该方法用于执行本地事务
‐ @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
‐ @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
‐ @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
整体业务逻辑如下
订单创建成功,用户增加积分
代码如下
Producer 发送事务消息
@RestController
@RequestMapping("/order")
public class OrderController2 {
@Resource
RocketMQTemplate rocketMqTemplate;
@PostMapping("/orderAndPoints")
public String orderAndPoints(@RequestBody UserOrder order){
TransactionSendResult transactionSendResult = rocketMqTemplate.sendMessageInTransaction(
"transaction-topic",
MessageBuilder.withPayload(order).build(),
null);
//消息发送状态
// SendStatus sendStatus = transactionSendResult.getSendStatus();
// //本地事务状态
// String localState = transactionSendResult.getLocalTransactionState().name();
//
// System.out.println("发送状态:{},本地事务执行状态:{}"+ sendStatus+ " " + localState);
return order.toString();
}
}
MQ Server回应消息发送成功
通过实现RocketMQLocalTransactionListener中的executeLocalTransaction 方法 如果在 controller 中orderAndPoints rocketMQ的事务消息发送成功,会自动监听executeLocalTransaction方法
在executeLocalTransaction 方法中进行本地事务的执行,例如创建订单等。如果不发生异常自然是进行返回COMMIT
事务状态
@Component
@RocketMQTransactionListener
public class OrderListener implements RocketMQLocalTransactionListener {
@Resource
OrderMapper orderMapper;
/**
* 消息发送成功执行 此方法本地事务
* 事务消息发送成功本方法被回调
* @param message
* @param o
* @return
*/
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("事务消息发送成功,开始指定本地事务");
UserOrder payload = (UserOrder) message.getPayload();
orderMapper.insert(payload);
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 回查事务消息
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
UserOrder payload = (UserOrder) message.getPayload();
UserOrder userOrder = orderMapper.selectById(payload.getOrderId());
if (userOrder == null){
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println("检查本地事务消息"+ payload.toString());
return RocketMQLocalTransactionState.COMMIT;
}
}
消息投递
如通过executeLocalTransaction 返回的是COMMIT 事务状态,则消息“ 状态标记为可消费。
事务回查
通过RocketMQLocalTransactionListener中的checkLocalTransaction 进行事务回查,比如订单是否被添加到数据库,或者本地消息表是否有记录等,回查状态进行返回事务状态
MQ Server回应消息发送成功和消息回查都是通过实现RocketMQLocalTransactionListener接口进行实现完整代码就是OrderListener
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-topic")
public class TransactionMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("事务消息消费成功,消息内容:{}", message);
}
}