当前位置: 首页 > article >正文

重学SpringBoot3-集成RocketMQ(二)

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成RocketMQ(二)

  • 1. 基础概念
  • 2. 准备工作
  • 3. 实现事务消息的生产者
  • 4. 事务监听器实现
  • 5. 消费者示例
  • 6. 发送事务消息
  • 7. 测试
    • 7.1 模拟本地事务正常提交
    • 7.2 模拟本地事务提交失败,未回查
    • 7.3 模拟本地事务提交失败,回查成功
    • 7.4 模拟本地事务提交失败,回查失败
    • 7.5 模拟本地事务提交成功,消费失败
  • 关键点总结

今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。

1. 基础概念

在 RocketMQ 中,半消息(Half Message)主要用于实现事务消息。它是指生产者在发送事务消息时,RocketMQ 会先将消息保存为 半消息,等待事务状态的最终确认,确保消息的可靠性和一致性。

图片来源:https://www.cnblogs.com/dennyzhangdd/p/14572024.html

半消息的工作流程:

  1. 发送半消息:生产者首先将消息发送到 RocketMQ,RocketMQ 将其标记为半消息,并暂时存储到消息队列中,但这时消费者不会收到该消息。
  2. 执行本地事务:生产者在发送半消息后,开始执行自己的本地事务操作。
  3. 提交或回滚消息
    • 如果本地事务成功,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费者。
    • 如果本地事务失败,生产者会通知 RocketMQ回滚消息,RocketMQ 会删除该半消息。
  4. 事务状态回查:如果 RocketMQ 没有收到生产者的事务状态确认,RocketMQ 会通过回查机制询问生产者事务的最终状态,确保消息的一致性。

现有一个案例,后端文件上传接口,同时上传 OSS 和 MySQL,数据库负责文件元信息的增删改查, OSS负责存储文件对象,如何保证最终一致性?下面用RocketMQ 的事务消息来实现最终一致性。

2. 准备工作

请参考《重学SpringBoot3-集成RocketMQ(一)》进行环境搭建和配置工作。配置文件新增如下配置:

  consumer2:
    group: springboot-consumer-group2  # 新的消费者组名称
    topic: transaction-topic  # 订阅新的主题
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能

3. 实现事务消息的生产者

创建一个事务消息的生产者类,通过事务生产者发送消息,并处理本地事务逻辑。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionalMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(String topic, String message) {
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
        System.out.println("Transaction message sent: " + sendResult.getLocalTransactionState());
    }
}

4. 事务监听器实现

通过实现 RocketMQLocalTransactionListener 接口,定义事务的提交或回滚逻辑。

package com.example.boot308rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务逻辑,根据业务情况返回事务的提交或回滚状态
        try {
            // 模拟本地事务处理逻辑
            System.out.println("Executing local transaction...");
            boolean success = performLocalTransaction();
            if (success) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事务回查逻辑,确认本地事务的最终状态
        System.out.println("Checking local transaction...");

        // 根据本地事务的处理结果返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE
        System.out.println("local transaction check success");
        return RocketMQLocalTransactionState.COMMIT;
    }

    private boolean performLocalTransaction() {
        // TODO 模拟本地事务处理文件上传OSS
        try {
            System.out.println("Upload files to OSS...");
            Thread.sleep(3000);
            System.out.println("File upload to OSS completed");
            return true;
        } catch (InterruptedException e) {
            System.out.println("Failed to upload file to OSS");
            return false;
        }
    }

}

5. 消费者示例

创建一个消费者,订阅并消费事务消息。

RocketMQListener<String> 是一个接口类型,用于定义一个 RocketMQ 消息监听器,它指定接收的消息类型为 String。在 RocketMQ 中,消费者可以通过实现 RocketMQListener 接口来自动处理接收到的消息。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionalMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理接收到的消息
        System.out.printf("Received message: %s%n", message);
    }
}

6. 发送事务消息

在服务中调用事务消息生产者:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Resource
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/sendTransactionMessage")
    public ResponseEntity<String> sendTransactionMessage(@RequestParam String message) {
        rocketMQProducer.sendTransactionMessage("transaction-topic", message);
        return ResponseEntity.ok("Transaction message sent: " + message);
    }
}

7. 测试

7.1 模拟本地事务正常提交

如下图观察到,当本地事务即文件上传完成之后,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费进行消费。

本地事务正常提交

7.2 模拟本地事务提交失败,未回查

在 RocketMQ 中,如果消息生产者没有在规定的时间内向消息队列确认事务状态,RocketMQ 会通过回查机制(即“回滚检查”或“事务回查”)来询问生产者事务的最终状态,从而确保消息的一致性。broker.conf 中配置 transactionCheckMax=10000 表示 RocketMQ 最长等待 10 秒后进行事务状态回查。

本地事务提交失败,未回查

7.3 模拟本地事务提交失败,回查成功

本人搭建 RocketMQ 设置的回查时间为15s,所以将本地事务执行时间修改为 16s,这样会触发 RocketMQ 进行事务状态回查。

transactionCheckMax

事务状态回查成功

7.4 模拟本地事务提交失败,回查失败

修改回查方法的返回值,让RocketMQ 回查本地状态值将消息进行回滚,消费者同样不会消费消息。

回查失败

7.5 模拟本地事务提交成功,消费失败

例如生产者本地事务执行成功,但是消费者消费失败的情况,RocketMQ 会进行消息重试,直至成功。

修改一下消费者处理逻辑:

package com.example.boot308rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "springboot-consumer-group2")
public class TransactionalMessageConsumer02 implements RocketMQListener<MessageExt> {


    @Override
    public void onMessage(MessageExt message) {
        try {
            // 处理消息的业务逻辑
            String msgBody = new String(message.getBody(), "UTF-8");
            log.info("Received message:{}", msgBody);

            // 模拟业务处理
            boolean success = processBusinessLogic(msgBody);

            if (!success) {
                throw new RuntimeException("Business processing failure");
            }

            log.info("Business processing successful");

        } catch (Exception e) {
            log.error("MsgID:{},reconsumeTimes:{},e:{}", message.getMsgId(), message.getReconsumeTimes(), e.getMessage());

            // 重新抛出异常,让 RocketMQ 进行重试
            throw new RuntimeException("Message consumption failed, retrying");
        }
    }

    private boolean processBusinessLogic(String message) {
        // 这里是业务逻辑,返回 true 表示成功,false 表示失败
        // 例如:数据库操作或其他远程调用
        return Math.random() > 0.5; // 模拟随机成功或失败
    }
}

消费失败

关键点总结

  1. 事务消息发送:通过 RocketMQTemplate.sendMessageInTransaction() 方法发送事务消息。
  2. 本地事务处理:实现 TransactionListener 接口的 executeLocalTransaction() 方法处理本地事务逻辑。
  3. 事务回查:在 checkLocalTransaction() 方法中定义如何检查事务消息的最终状态。
    以上就是 SpringBoot3 结合 RocketMQ 实现的事务消息,提供了分布式事务中的最终一致性。

http://www.kler.cn/news/303821.html

相关文章:

  • Python(TensorFlow和PyTorch)及C++注意力网络导图
  • Docker 安装 Nacos 教程
  • L3级智能网联汽车准入试点详细解析及所需材料
  • oracle 如何查死锁
  • Web大学生网页作业成品——动漫喜羊羊网页设计与实现(HTML+CSS)(4个页面)
  • 趣味SQL | 从围棋收官到秦楚大战的数据库SQL语言实现
  • Flutter自定义Icon的简易使用(两种)
  • 项目——负载均衡OJ
  • 【数据库】瀚高数据库Windows/Linux、下载安装、高级用法、数据迁移(保姆篇)
  • MongoDB设置系统服务启动教程
  • c/c++ *和后置++的优先级
  • YOLOv8+Deepsort+PyQt+GUI 语义分割+目标检测+姿态识别 三者合一(集成于一套系统)综合视觉分析系统
  • 数据库设计中的需求分析
  • TCPIP网络编程(尹圣雨)UDP 轮流收发消息(windows)
  • CSS 新特性查漏补缺,快来看看你用过几个?
  • C#使用TCP-S7协议读写西门子PLC(一)
  • SAM 2:分割图像和视频中的任何内容
  • ERP进销存多仓库管理系统源码 带完整的安装代码包以及搭建部署教程
  • Unity Timeline
  • HTTP 协议和 APACHE 服务
  • 零基础国产GD32单片机编程入门(二十五)USB口介绍及CDC类虚拟串口通讯详解及源码
  • 【深入浅出Redis】Redis常见问题以及解决方案,可用于面试
  • RK_android8.1在设置中去掉Bluetooth功能
  • 学习笔记 - 知识图谱的符号表示方法
  • Python去除图片中的白色像素点背景
  • David Baker 任科学顾问,初创公司发布世界最大蛋白质相互作用数据库,已获 8 轮融资
  • WPS如何删除表格下的空白页
  • 常见的站长工具接口有哪些?站长们该如何选择?
  • uniapp child.onFieldChange is not a function
  • Android Studio 安装配置教程(Windows最详细版)