当前位置: 首页 > article >正文

RocketMQ 事务消息 原理及使用方法解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月24日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • RocketMQ事务消息
  • 适用场景举例
  • 使用示例
    • 发送事务消息
    • 事务回查
    • 事务执行

RocketMQ事务消息

RocketMQ针对事务消息扩展了两个相关的概念:

1. 半消息

半消息(Half Message)是一种特殊的消息类型,处于这个状态的消息暂时不能被Consumer消费。

当一条事务消息被成功投递到Broker上,但Broker没有收到Producer的二次确认时,该事务消息就处于暂时不可消费的状态,这种消息就是半消息


2. 消息状态回查

由于网络抖动、系统宕机等等原因,可能导致ProducerBroker发送的二次确认信息没有送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态。

这个机制主要是用来解决分布式事务中的超时问题。

在这里插入图片描述
上图是RocketMQ官网提供的事务消息流程图,执行步骤如下:

  1. ProducerBroker端发送半消息
  2. Broker发送ACK确认,表示半消息发送成功
  3. Producer执行本地事务
  4. 本地事务完毕,根据事务的状态,ProducerBroker发送二次确认消息,确认该半消息的CommitRollback状态。Broker收到二次确认消息之后:如果是Commit状态,则直接将消息发送到Consumer端执行消费逻辑;如果是Rollback状态,则会直接将其标记为失败,不会发送给Consumer
  5. 针对超时情况,Broker主动向Producer发起消息回查
  6. Producer处理回查消息,返回对应的本地事务执行结果
  7. Broker针对消息回查的结果,执行【步骤4】的操作

适用场景举例

以转账系统为例,假设A要向B转账100元,执行本地事务和发送异步消息的过程应该同时保持成功或失败,即A的账户扣款成功后,就一定要发消息发送出去,最直观的思路可能有两个:

1. 先发消息

这种策略的流程如下:
在这里插入图片描述
存在的问题是: 如果消息发送成功,但后续A扣款失败了,消费端仍然会消费这条消息,进而向B账户里打钱,数据就出现不一致的情况了。


2. 后发消息

在这里插入图片描述
存在的问题是: 如果扣款成功,但是发送消息失败,就会出现A已经扣钱了,但B账户里没有入账的情况,同样也是无法接受的。

出现上述情况的根本原因是本地事务和发送消息这两个操作并不是原子的,因此也就无法做到同时失败或同时成功,所以数据一致性难以保障。


解决上述问题的方法就是上面提到的半消息
在这里插入图片描述
如上图所示,执行本地事务之前先发送一个半消息,此时还不能被消费者消费,只有当本地事务执行完毕并发送二次确认消息之后,半消息才能被Consumer消费。

如此以来就保证了多个系统数据的数据一致性,前提是系统不需要保证数据的强一致性


使用示例

发送事务消息

RocketMQ发送事务消息设计到消息发送、消息回查、消息二次确认等过程,因此这个过程可能会“稍显复杂”

发送事务消息使用的是TransactionMQProducer,一个简单的demo如下:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionCheckListener transactionCheckListener = new TransactionCheckListener() {
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                return null;
            }
        };

        TransactionMQProducer producer = new TransactionMQProducer("GROUP A");

        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(2);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.start();

        String[] tags = new String[]{"TAG A", "TAG B", "TAG C"};
        LocalTransactionExecuter transactionExecuter = new LocalTransactionExecuter() {
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                return null;
            }
        };
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TEST", tags[i % tags.length], "KEY " + i, ("HELLO, ROCKETMQ" + i).getBytes());
            SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, null);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

事务回查

checkLocalTransaction是事务消息回查监听方法,可以获取本地事务状态,根据事务的状态来确定是否要发送二次确认消息,或者进行事务回滚操作。

消息回查事务的状态由以下几种情况:

  1. LocalTransactionState.ROLLBACK_MESSAGE:事务回滚
  2. LocalTransactionState.COMMIT_MESSAGE:事务提交
  3. LocalTransactionState.UNKNOW:未知状态,此时Broker会定时重新查询Producer消息的状态,直到出现前面两种情况。
public interface TransactionListener {
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

事务执行

executeLocalTransaction方法用于执行本地事务,如果本地事务执行成功则进行事务提交,否则进行事务回滚,如果是UNKNOW状态的话,Broker就会定时回查Producer的消息状态,直到彻底成功或失败。

public interface TransactionListener {
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
}


http://www.kler.cn/a/3926.html

相关文章:

  • 蓝桥杯训练—矩形面积交
  • 高等数学:映射与函数
  • 深度学习 DAY1:RNN 神经网络及其变体网络(LSTM、GRU)
  • # [Unity] 【游戏开发】获取物体和组件的脚本方法
  • 微信消息群发(定时群发)-UI自动化产品(基于.Net平台+C#)
  • 论文阅读:CosAE Learnable Fourier Series for Image Restoration
  • 【MySQL】DDL_修改、删除数据库表
  • 脉诊(切脉、诊脉、按脉、持脉)之法——入门篇
  • 【SpringBoot17】SpringBoot中使用Quartz管理定时任务
  • 以太坊2.0-上海升级节点详细搭建文档
  • “蓝桥杯”递推和递归(一)——取数位
  • 通过小三越位,彻底弄懂 https 原理本质(三)加密漏洞
  • Python中 __init__的通俗解释是什么?
  • Linux项目自动化构建工具-make/makefile 介绍及使用
  • 【RV1126】调试GT911,1024x600 7寸 MIPI 电容触摸屏
  • 在 Python 中将字符串转换为集合
  • Vector - CAPL - 实时时间on *(续2)
  • 程序员面试攻略:面试中的技巧(付费资料)
  • 淘宝天猫价格监控接入方案
  • 面试热点题:回溯算法 递增子序列与全排列 II
  • 【chatgpt】点云转图片后圆特征检测
  • Linux系统之安装PostgreSQL数据库
  • 我的第一台电脑的故事
  • 亚马逊、eBay、速卖通等跨境电商自养号测评,你知道多少?
  • CCF-CSP认证 202303 500分题解
  • java八股文--数据库