当前位置: 首页 > article >正文

如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费

文章目录

      • 准备工作
      • 项目依赖
      • 配置 RocketMQ
      • 生产批量消息
      • 消费批量消息
      • 测试批量消息发送和消费
      • 总结
      • 推荐阅读文章

RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能。

准备工作

在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。

项目依赖

首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

这个依赖包包含了与 RocketMQ 集成所需的所有内容。

配置 RocketMQ

application.yml 文件中添加 RocketMQ 的相关配置:

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: batchConsumerGroup
  producer:
    group: batchProducerGroup
  • name-server:RocketMQ 服务的地址
  • consumer.group:消息消费的分组
  • producer.group:消息生产的分组

确保 name-server 地址是正确的,指向你的 RocketMQ 服务。

生产批量消息

创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class BatchProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendBatchMessages() {
        List<Message<String>> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();
            messages.add(message);
        }
        
        rocketMQTemplate.syncSend("BatchTopic", messages, 10000);
        System.out.println("批量消息发送成功!");
    }
}
  • 这里,我们创建了 10 条消息并将它们添加到列表 messages 中。
  • 调用 rocketMQTemplate.syncSend 方法将消息批量发送到主题 BatchTopic

消费批量消息

接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {

    @Override
    public void onMessage(List<String> messages) {
        System.out.println("批量接收到消息:");
        messages.forEach(message -> System.out.println("消息内容:" + message));
    }
}

在这段代码中:

  • @RocketMQMessageListener 注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题 BatchTopic 和消费分组 batchConsumerGroup
  • consumeMessageBatchMaxSize = 10 表示每次批量消费最多 10 条消息。
  • onMessage 方法会处理接收到的消息列表,并逐条打印出消息内容。

测试批量消息发送和消费

创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private BatchProducer batchProducer;

    @GetMapping("/sendBatchMessages")
    public String sendBatchMessages() {
        batchProducer.sendBatchMessages();
        return "批量消息已发送";
    }
}

通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。

  • 调用这个接口会将批量消息发送到 RocketMQ 主题 BatchTopic
  • BatchConsumer 会自动接收并批量处理这些消息。

总结

我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:

  1. 使用 BatchProducer 类批量发送消息。
  2. 使用 BatchConsumer 类批量消费消息,并设置最大批量大小。
  3. 通过简单的 REST API 控制消息发送,确保一切顺利。

批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。

推荐阅读文章

  • 由 Spring 静态注入引发的一个线上T0级别事故(真的以后得避坑)

  • 如何理解 HTTP 是无状态的,以及它与 Cookie 和 Session 之间的联系

  • HTTP、HTTPS、Cookie 和 Session 之间的关系

  • 什么是 Cookie?简单介绍与使用方法

  • 什么是 Session?如何应用?

  • 使用 Spring 框架构建 MVC 应用程序:初学者教程

  • 有缺陷的 Java 代码:Java 开发人员最常犯的 10 大错误

  • 如何理解应用 Java 多线程与并发编程?

  • 把握Java泛型的艺术:协变、逆变与不可变性一网打尽

  • Java Spring 中常用的 @PostConstruct 注解使用总结

  • 如何理解线程安全这个概念?

  • 理解 Java 桥接方法

  • Spring 整合嵌入式 Tomcat 容器

  • Tomcat 如何加载 SpringMVC 组件

  • “在什么情况下类需要实现 Serializable,什么情况下又不需要(一)?”

  • “避免序列化灾难:掌握实现 Serializable 的真相!(二)”

  • 如何自定义一个自己的 Spring Boot Starter 组件(从入门到实践)

  • 解密 Redis:如何通过 IO 多路复用征服高并发挑战!

  • 线程 vs 虚拟线程:深入理解及区别

  • 深度解读 JDK 8、JDK 11、JDK 17 和 JDK 21 的区别

  • 10大程序员提升代码优雅度的必杀技,瞬间让你成为团队宠儿!

  • “打破重复代码的魔咒:使用 Function 接口在 Java 8 中实现优雅重构!”

  • Java 中消除 If-else 技巧总结

  • 线程池的核心参数配置(仅供参考)

  • 【人工智能】聊聊Transformer,深度学习的一股清流(13)

  • Java 枚举的几个常用技巧,你可以试着用用


http://www.kler.cn/a/389124.html

相关文章:

  • 蔚来Java面试题及参考答案
  • request爬虫库的小坑
  • 华为机试HJ39 判断两个IP是否属于同一子网
  • 使用Matlab建立随机森林
  • 轻松上手:使用Docker部署Java服务
  • 【日志】392.判断子序列
  • Spark本地模式安装
  • MYSQL中的两种转义操作
  • Linux_调试器-gdb/cgdb的使用
  • 境内部署DIfy(上篇)
  • 软件工程中的创建型设计模式:工厂方法模式与抽象工厂模式
  • Java抽象类与接口
  • openresty入门教程:init_by_lua_block
  • ctfshow web入门黑盒测试web380-384
  • 每周算法2:数学+模拟+哈希表+栈+线性dp+贪心(简单)
  • 两个链表求并集、交集、差集
  • 微信小程序开发(二)登录流程
  • CMAKE 编译CUDA项目失败 “/usr/bin/nvcc“ is not able to compile a simple test program.
  • 通义千问API调用测试 (colab-python,vue)
  • 【机器学习】数学知识:指数
  • Android 延时操作的常用方法
  • Linux软件包管理
  • 分布式——BASE理论
  • 【harbor】离线安装2.9.0-arm64架构服务制作和升级部署
  • Java:JVM
  • 解决:使用EasyExcel导入Excel模板时出现数据导入不进去的问题