当前位置: 首页 > article >正文

RocketMQ事务消息原理

RocketMQ事务消息原理

RocketMQ的事务消息是一种确保消息与本地事务一致性的机制,广泛应用于分布式系统中,尤其是需要保证跨服务数据一致性的场景。其核心原理是在生产者发送消息到Broker时,先发送一个“半消息”,然后生产者执行本地事务。如果事务成功,生产者会发送提交(Commit)请求,Broker将半消息标记为正常消息;若失败,则发送回滚(Rollback)请求,Broker丢弃该消息。如果生产者未及时确认,Broker会进行事务回查,确保消息的一致性处理。通过这种机制,RocketMQ提供了一种可靠的方式来保证消息的投递与本地事务的原子性,避免了因网络问题或系统故障导致的消息丢失或重复消费。本文将介绍如何使用Spring Boot集成RocketMQ事务消息,并通过实际代码示例演示其应用。

一.执行流程

  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
  • 8、消费者段消费到消息之后,执行本地事务。

简单概括就是,生产者向mq发送消息,mq收到消息后,会让生产者去执行本地事务,成功就返回commit,失败就返回rollback,如果mq迟迟没有等到生产者的确认,就会定时回查生产者的事务状态

二、代码示例 Spring Boot集成RocketMQ事务消息

1. 依赖与配置

pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

application.yml

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: tx-group
    send-message-timeout: 3000

2.场景假设

  • 服务A(订单服务):创建订单
  • 服务B(库存服务):扣减库存

3.代码示例

3.1订单服务
定义事务监听器:
@Slf4j
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;

    // 执行本地事务(如创建订单)
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Order order = parseOrder(msg);
            orderService.create(order); // 本地事务
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 事务回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = msg.getKeys();
        boolean success = orderService.checkStatus(orderId);
        return success ? COMMIT : ROLLBACK;
    }
}
发送事务消息
@Service
public class OrderService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void createOrder(Order order) {
        Message<Order> message = MessageBuilder
            .withPayload(order)
            .setHeader(RocketMQHeaders.KEYS, order.getId())
            .build();
        
        // 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-topic", message, null);
        
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("事务消息发送失败");
        }
    }
}
3.2库存服务
// 库存服务的事务监听器
@RocketMQTransactionListener
public class InventoryTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private InventoryService inventoryService;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            InventoryDTO dto = parseDTO(msg);
            inventoryService.deduct(dto); // 扣减库存
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 本地事务失败,发送补偿消息到订单服务
            rocketMQTemplate.send("order-rollback-topic", buildRollbackMessage(dto));
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 回查逻辑(略)
}

@Slf4j
@Service
@RocketMQMessageListener(
    topic = "order-rollback-topic",
    consumerGroup = "order-rollback-group"
)
public class OrderRollbackConsumer implements RocketMQListener<OrderRollbackDTO> {
    
    @Autowired
    private OrderService orderService;

    @Override
    public void onMessage(OrderRollbackDTO dto) {
        // 根据订单ID回滚订单
        orderService.cancelOrder(dto.getOrderId());
    }
}
订单服务监听补偿消息

4. 关键回滚机制

场景回滚策略
本地事务失败直接返回ROLLBACK,Broker删除半消息,消息不投递
下游服务失败下游服务发送补偿消息到上游,触发上游回滚(如订单取消)
消息消费重试RocketMQ默认重试16次,超过后进入死信队列,需人工处理

http://www.kler.cn/a/557571.html

相关文章:

  • 爬虫小案例豆瓣电影top250(json格式)
  • C++如何获取windows系统通知消息列表
  • RoCBert:具有多模态对比预训练的健壮中文BERT
  • 【Qt】可爱的窗口关闭确认弹窗实现
  • Svelte 最新中文文档教程(16)—— Context(上下文)
  • 微信小程序数据绑定与事件处理:打造动态交互体验
  • Ubuntu 22.04 Install deepseek
  • MongoDB应用设计调优
  • My Metronome for Mac v1.4.2 我的节拍器 支持M、Intel芯片
  • 微软CEO-纳德拉访谈-AGI计划
  • RT-Thread+STM32L475VET6——USB鼠标模拟
  • Java 虚拟机(JVM)方法区详解
  • tortoiseSVN 如何克隆项目到本地
  • 机器学习实战(7):聚类算法——发现数据中的隐藏模式
  • 化学品安全数据表(MSDS)的全面解析与实用指南
  • JAVAweb-标签选择器,盒模型,定位,浮动
  • Linux 系统中的软链接与硬链接
  • Web 开发中的 5 大跨域标签解析:如何安全地进行跨域请求与加载外部资源
  • YOLOv11-ultralytics-8.3.67部分代码阅读笔记-utils.py
  • ragflow-RAPTOR到底是什么?请通俗的解释!