RocketMQ事务消息原理
RocketMQ事务消息原理
RocketMQ的事务消息是一种确保消息与本地事务一致性的机制,广泛应用于分布式系统中,尤其是需要保证跨服务数据一致性的场景。其核心原理是在生产者发送消息到Broker时,先发送一个“半消息”,然后生产者执行本地事务。如果事务成功,生产者会发送提交(Commit)请求,Broker将半消息标记为正常消息;若失败,则发送回滚(Rollback)请求,Broker丢弃该消息。如果生产者未及时确认,Broker会进行事务回查,确保消息的一致性处理。通过这种机制,RocketMQ提供了一种可靠的方式来保证消息的投递与本地事务的原子性,避免了因网络问题或系统故障导致的消息丢失或重复消费。本文将介绍如何使用Spring Boot集成RocketMQ事务消息,并通过实际代码示例演示其应用。
一.执行流程
- 1、Producer 向 broker 发送半消息
- 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
- 3、Producer 端执行本地事务。
- 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
- 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
- 6、Producer 端查询本地事务的状态
- 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
- 8、消费者段消费到消息之后,执行本地事务。
简单概括就是,生产者向mq发送消息,mq收到消息后,会让生产者去执行本地事务,成功就返回commit,失败就返回rollback,如果mq迟迟没有等到生产者的确认,就会定时回查生产者的事务状态
二、代码示例 Spring Boot集成RocketMQ事务消息
1. 依赖与配置
pom.xml:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
application.yml:
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: tx-group
send-message-timeout: 3000
2.场景假设
- 服务A(订单服务):创建订单
- 服务B(库存服务):扣减库存
3.代码示例
3.1订单服务
定义事务监听器:
@Slf4j
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
// 执行本地事务(如创建订单)
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = parseOrder(msg);
orderService.create(order); // 本地事务
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 事务回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getKeys();
boolean success = orderService.checkStatus(orderId);
return success ? COMMIT : ROLLBACK;
}
}
发送事务消息
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) {
Message<Order> message = MessageBuilder
.withPayload(order)
.setHeader(RocketMQHeaders.KEYS, order.getId())
.build();
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic", message, null);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("事务消息发送失败");
}
}
}
3.2库存服务
// 库存服务的事务监听器
@RocketMQTransactionListener
public class InventoryTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private InventoryService inventoryService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
InventoryDTO dto = parseDTO(msg);
inventoryService.deduct(dto); // 扣减库存
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 本地事务失败,发送补偿消息到订单服务
rocketMQTemplate.send("order-rollback-topic", buildRollbackMessage(dto));
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 回查逻辑(略)
}
@Slf4j
@Service
@RocketMQMessageListener(
topic = "order-rollback-topic",
consumerGroup = "order-rollback-group"
)
public class OrderRollbackConsumer implements RocketMQListener<OrderRollbackDTO> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(OrderRollbackDTO dto) {
// 根据订单ID回滚订单
orderService.cancelOrder(dto.getOrderId());
}
}
订单服务监听补偿消息
4. 关键回滚机制
场景 | 回滚策略 |
---|---|
本地事务失败 | 直接返回ROLLBACK ,Broker删除半消息,消息不投递 |
下游服务失败 | 下游服务发送补偿消息到上游,触发上游回滚(如订单取消) |
消息消费重试 | RocketMQ默认重试16次,超过后进入死信队列,需人工处理 |