当前位置: 首页 > article >正文

Spring Boot集成RocketMQ实现普通、延时、事务消息发送接收、PULL消费模式及开启ACL | Spring Cloud 30

一、前言

在前面我们通过以下章节对RocketMQ有了基础的了解:

docker-compose 搭建RocketMQ 5.1.0 集群(双主双从模式) | Spring Cloud 28

docker-compose 搭建RocketMQ 5.1.0 集群开启ACL权限控制 | Spring Cloud 29

现在开始我们正式学习Spring Boot中集成RocketMQ使用,,在本章节主要进行对以下部分讲解说明:

  • 普通消息的发送接收
  • 延时消息的发送接收
  • 事务消息的发送接收
  • 发送端和接收端开启ACL
  • PULL模式消费及@ExtRocketMQConsumerConfiguration使用

二、项目集成RocketMQ

2.1 项目总体结构

在这里插入图片描述

2.2 引入依赖

rocketmq/pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2.3 配置文件

rocketmq/src/main/resources/application.yml

server:
  port: 8888

spring:
  application:
    name: @artifactId@

rocketmq:
  name-server: 192.168.0.30:9876
  producer:
    group: @artifactId@-group
    send-message-timeout: 60000  # 发送消息超时时间,单位:毫秒。默认为 3000
    retry-times-when-send-failed: 3 # 同步发送消息时,失败重试次数。默认为 2 次
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 默认为 false
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能,默认为 true 开启
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic,默认为 RMQ_SYS_TRACE_TOPIC
  consumer:
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key

logging:
  level:
    org:
      springframework:
        boot:
          autoconfigure:
            logging: info

2.4主题及消费组常量

com/gm/rocketmq/component/rocketmq/TopicConstants.java

package com.gm.rocketmq.component.rocketmq;

/**
 * 主题常量
 */
public interface TopicConstants {

    String NORMAL_ROCKETMQ_TOPIC_TEST= "NORMAL_ROCKETMQ_TOPIC_TEST";

    String ORDERLY_ROCKETMQ_TOPIC_TEST= "ORDERLY_ROCKETMQ_TOPIC_TEST";

    String SCHEDULE_ROCKETMQ_TOPIC_TEST= "SCHEDULE_ROCKETMQ_TOPIC_TEST";

    String TRANSACTION_ROCKETMQ_TOPIC_TEST= "TRANSACTION_ROCKETMQ_TOPIC_TEST";

    String PULL_ROCKETMQ_TOPIC_TEST= "PULL_ROCKETMQ_TOPIC_TEST";

    String EXT_ROCKETMQ_TOPIC_TEST= "EXT_ROCKETMQ_TOPIC_TEST";

    String CONSUMER_GROUP = "_CONSUMER_GROUP";
}

三、各类型消息收发

3.1 普通消息

3.1.1 普通消息发送

     @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 向rocketmq发送同步和异步消息
     */
    @RequestMapping(value = "sendNormal", method = RequestMethod.GET)
    public String sendNormal() {
        rocketMQTemplate.send(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":sync", MessageBuilder.withPayload("同步发送消息").build());
        rocketMQTemplate.asyncSend(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":async", MessageBuilder.withPayload("异步发送消息").build(), new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步发送成功:{}", sendResult.getSendStatus().name());
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("异步发送失败:{}", throwable.getMessage());
            }
        });
        return "OK";
    }

3.1.2 普通消息接收

com/gm/rocketmq/component/rocketmq/NormalRocketMqListener.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class NormalRocketMqListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("普通订阅-接收到的信息:{}", s);
    }
}

@RocketMQMessageListener注解参数说明:

  • consumerGroup:消费者订阅组,它是必需的,并且必须是唯一的。
  • topic:主题名字,生产发送的主题名。
  • consumeMode:消费模式,可选择并发或有序接收消息;默认CONCURRENTLY同时接收异步传递的消息。
  • messageModel:消息模式,默认CLUSTERING集群消费;如果希望所有订阅者都接收消息,可以设置广播BROADCASTING
  • consumeThreadMax:消费者最大线程数,默认64
  • consumeTimeout:消息阻塞最长时间,默认15分钟。
  • nameServer:服务器地址,默认读取配置文件地址,可以单独为消费者设置指定位置。
  • selectorExpression:消费指定的Tag标签的业务消息。
  • ConsumerACL 功能需要在 @RocketMQMessageListener 中进行配置
  • ProducerACL 功能需要在配置文件中进行配置
  • 更多查看官方解释

3.2 顺序消息

3.2.1 顺序消息发送

    /**
     * 向rockertmq发送顺序消息,同步方式
     *
     * @return
     */
    @RequestMapping(value = "sendOrderlySync", method = RequestMethod.GET)
    public String sendOrderlySync() {
        // 订单列表
        List<OrderStep> orderList = buildOrders();
        for (int i = 0; i < 10; i++) {
            Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();
            String orderId = String.valueOf(orderList.get(i).getOrderId());
            rocketMQTemplate.sendOneWayOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":sync", msg, orderId);
        }
        return "OK";
    }

    /**
     * rockertmq发送顺序消息,异步方式
     *
     * @return
     */
    @RequestMapping(value = "sendOrderlyAsync", method = RequestMethod.GET)
    public String sendOrderlyAsync() {
        // 订单列表
        List<OrderStep> orderList = buildOrders();
        for (int i = 0; i < 10; i++) {
            Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();
            String orderId = String.valueOf(orderList.get(i).getOrderId());
            rocketMQTemplate.syncSendOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":async", msg, orderId);
        }
        return "OK";
    }
    
   /**
     * 订单的步骤
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}';
        }
    }

    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }

3.2.1 顺序消息接收

com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerA.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,
        consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",
        consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerA implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("顺序订阅-接收到的信息:{}", s);
    }
}

com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerB.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,
        consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",
        consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerB implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("顺序订阅-接收到的信息:{}", s);
    }
}

3.3 延时消息

3.3.1 延时消息发送

    /**
     * rockertmq发送延时消息
     *
     * @return
     */
    @RequestMapping(value = "sendSchedule", method = RequestMethod.GET)
    public String sendSchedule() {
        Message msg = MessageBuilder.withPayload("延时消息")
                .build();
        rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + ":", msg, 20);
        log.info("延时消息-发布时间:{}", System.currentTimeMillis());
        return "OK";
    }

3.3.2 延时消息接收

com/gm/rocketmq/component/rocketmq/ScheduleRocketMqListener.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST,
        consumerGroup = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class ScheduleRocketMqListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String msg = "内容:" + new String(message.getBody()) + ",时间:" + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later";
        log.info("延时订阅-接收到的信息:{}", msg);
        log.info("延时消息-接受时间:{}", System.currentTimeMillis());
    }
}

3.4 发送端事务消息

3.4.1 事务消息发送

    /**
     * rockertmq发送生产端事务消息
     *
     * @return
     */
    @RequestMapping(value = "sendTransaction", method = RequestMethod.GET)
    public String sendTransaction() {
        Message msg = MessageBuilder.withPayload("事务消息")
                .build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TopicConstants.TRANSACTION_ROCKETMQ_TOPIC_TEST + ":", msg, "自定义参数");
        log.info("事务消息-发布结果:{} = {}", result.getTransactionId(), result.getSendStatus());
        return "OK";
    }

com/gm/rocketmq/component/rocketmq/TransactionListenerImpl.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    // 事务消息共有三种状态,提交状态、回滚状态、中间状态:

    // RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
    // RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
    // RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。

    // executeLocalTransaction 方法来执行本地事务
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String transactionId = msg.getHeaders().get("__transactionId__").toString();
        log.info("执行本地事务,transactionId:{}", transactionId);

        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(transactionId, status);
        log.info("获取自定义参数:{}", arg);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    // checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transactionId = msg.getHeaders().get("__transactionId__").toString();
        log.info("检查本地事务状态,transactionId:{}", transactionId);
        Integer status = localTrans.get(transactionId);
        if (null != status) {
            switch (status) {
                case 0:
                    return RocketMQLocalTransactionState.UNKNOWN;
                case 1:
                    return RocketMQLocalTransactionState.COMMIT;
                case 2:
                    return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

3.5 Pull模式消费

3.5.1 源码分析

rocketmq-spring-boot-starter中关于Pull模式消费的自动配置,
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

在这里插入图片描述
其中rocketmq.name-serverrocketmq.pull-consumer.grouprocketmq.pull-consumer.topic三项配置为必填项。

剩余其他配置,org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.PullConsumer

在这里插入图片描述

由上可知,利用rocketmq-spring-boot-starter实现PULL模式,只支持单TopicPULL消费,要想对多个Topic进行PULL模式消费需要用到@ExtRocketMQConsumerConfiguration

3.5.2 PULL消费所需配置文件

配置文件新增pull-consumer相关配置,完整rocketmq/src/main/resources/application.yml

rocketmq:
  name-server: 192.168.0.30:9876
  producer:
    group: @artifactId@-group
    send-message-timeout: 60000  # 发送消息超时时间,单位:毫秒。默认为 3000
    retry-times-when-send-failed: 3 # 同步发送消息时,失败重试次数。默认为 2 次
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 默认为 false
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能,默认为 true 开启
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic,默认为 RMQ_SYS_TRACE_TOPIC
  consumer:
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
  pull-consumer:
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
    topic: PULL_ROCKETMQ_TOPIC_TEST
    group: PULL_ROCKETMQ_TOPIC_TEST_CONSUMER_GROUP

3.5.3 消息发送

    /**
     * 向ockertmq 消费端pull模式发生消息
     *
     * @return
     */
    @RequestMapping(value = "sendPull", method = RequestMethod.GET)
    public String pull() {
        for (int i = 0; i < 10; i++) {
            Message msg = MessageBuilder.withPayload("pull 消息" + i).build();
            rocketMQTemplate.syncSend(TopicConstants.PULL_ROCKETMQ_TOPIC_TEST + ":", msg);
        }
        for (int i = 0; i < 10; i++) {
            Message msg = MessageBuilder.withPayload("pull ext 消息" + i).build();
            rocketMQTemplate.syncSend(TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + ":", msg);
        }
        return "OK";
    }

3.5.4 @ExtRocketMQConsumerConfiguration使用

此示例利用@ExtRocketMQConsumerConfiguration定义消费,声明消费的Topic和消费组,或声明不同name-server

利用@ExtRocketMQTemplateConfiguration定义生产者,声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate

com/gm/rocketmq/component/rocketmq/ExtRocketMQTemplate.java

import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

/**
 * 可用于不同name-server或者其他特定的属性来定义非标的RocketMQTemplate,此示例定义消息Topic和消费者
 */
@ExtRocketMQConsumerConfiguration(group = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        topic = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST)
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

3.5.5 PULL模式消息接收

com/gm/rocketmq/component/rocketmq/PullConsumer.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;

@Slf4j
@Component
public class PullConsumer implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private ExtRocketMQTemplate extRocketMQTemplate;

    @Override
    public void run(String... args) {
        while (true) {

            List<String> messages = rocketMQTemplate.receive(String.class, 5000);
            log.info("receive from rocketMQTemplate, messages={}", messages);


            messages = extRocketMQTemplate.receive(String.class, 5000);
            log.info("receive from extRocketMQTemplate, messages={}", messages);
        }
    }
}

3.6 源码

源码地址:https://gitee.com/gm19900510/springboot-cloud-example.git 中模块rocketmq


http://www.kler.cn/a/3663.html

相关文章:

  • 可视化-numpy实现线性回归和梯度下降法
  • 音频入门(一):音频基础知识与分类的基本流程
  • 【0x0052】HCI_Write_Extended_Inquiry_Response命令详解
  • SDL2:arm64下编译使用 -- SDL2多媒体库使用音频实例
  • C语言内存之旅:从静态到动态的跨越
  • Spring Boot 项目启动报错 “找不到或无法加载主类” 解决笔记
  • LORA: LOW-RANK ADAPTATION OF LARGE LAN-GUAGE MODELS
  • C++11新特性
  • 安全防御之入侵检测篇
  • 【数据结构】栈与队列:后进先出与先进先出到底是啥?
  • vue3 解决各场景 loading过度 ,避免白屏尴尬!
  • C语言番外-------《函数栈帧的创建和销毁》知识点+基本练习题+完整的思维导图+深入细节+通俗易懂建议收藏
  • 软件架构常用设计
  • linux读写锁pthread_rwlock_t
  • 模拟斗地主
  • 【c++】:list模拟实现“任意位置插入删除我最强ƪ(˘⌣˘)ʃ“
  • 【Linux】进程理解与学习Ⅲ-环境变量
  • Centos Linux 正确安装 Redis 的方式
  • C++快速排序算法(详解)
  • 【李宏毅】-各种各样的self-attention
  • Linux上搭建Discuz论坛
  • 软件测试基础篇
  • QCefView编译配置(Windows-MSVC)(11)
  • jwt 学习笔记
  • ChatGPT常用开源项目汇总
  • 动态代理原理