当前位置: 首页 > article >正文

springboot 整合 RocketMQ 可用于物联网,电商高并发场景下削峰,保证系统的高可用

本文根据阿里云 RocketMQ产品文档整理,地址:https://help.aliyun.com/document_detail/29532.html?userCode=qtldtin2

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。下面我们来搭建RocketMQ

先去官网下载RocketMQ:

下载 | RocketMQ

 选择最新版本下载,下载后解压:

设置环境变量:

然后启动服务端:

已经正常启动。

接着启动broker

已经正常启动!

application.yaml文件配置mq生产者和消费者:

server:
  port: 8083

spring:
  application:
    name: springboot-rocketmq

rocketmq:
  # nameserver地址
  consumer:
    group: ${spring.application.name}-consumer-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10
  name-server: 192.168.1.104:9876
  producer:
    #指定发送者组名 
    group: ${spring.application.name}
    send-message-timeout: 300000
    compress-message-body-threshold: 4096
    max-message-size: 4194304
    retry-times-when-send-async-failed: 0
    retry-next-server: true
    retry-times-when-send-failed: 2 

pom文件添加依赖:

 

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

接着我们进行生产消息:

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private MessageSender messageSender;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @GetMapping("/syncSendBatch/{nums}")
    public String syncSendBatch(@PathVariable("nums") Integer nums) {
        messageSender.syncSend(nums);
        return "发送成功";
    }

 

}

 写上消息发送处理,这里通过接受的数量,进行延迟发送接收到的次数个消息:

@Component
public class MessageSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate; 


    // 延时消息
    public void syncSend(Integer nums){
        /**
         * 发送可靠同步消息 ,可以拿到SendResult 返回数据
         * 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
         * 这种方式应用场景非常广泛,例如重要的右键通知、报名短信通知、营销短信等。
         *
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         * 参数3: 超时时间 毫秒
         */
        for (int i = 0; i < nums; i++) {
            SendResult result= rocketMQTemplate.syncSend("test-send","测试同步消息:"+i,3000);
//            System.out.println(result.getMessageQueue());
            System.out.println(result);
        }
    }

    
}

接下来使用监听来消费消息:

@Component
@RocketMQMessageListener(topic = "test-send", consumerGroup = "${spring.application.name}-consumer-group",
        messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("-------接收到rocketmq消息:" + message);
    }
}

接着我通过接口请求来生产消息:

 

 

测试1000000万的消息很快,实际中需要处理复杂的业务,会有事务处理,这个时候我们需要进行分布式部署,削峰,保证系统的高可靠性。

我们可以通过控制台来观察消息的收发情况:

去官网下载:

https://github.com/apache/rocketmq-externals/tree/develop/dev

下载后修改服务器地址即可:


http://www.kler.cn/news/155096.html

相关文章:

  • HarmonyOS应用开发——程序框架UIAbility、启动模式与路由跳转
  • 鸿蒙绘制折线图基金走势图
  • 一缕青丝寄相思
  • 万宾科技第四代可燃气体监测仪的作用
  • C-语言每日刷题
  • 测试类运行失败:TestEngine with ID ‘junit-jupiter‘ failed to discover tests
  • docker+jmeter+influxdb+granfana
  • 力扣labuladong一刷day25天
  • MacDroid Pro for Mac – 安卓设备文件传输助手,实现无缝连接与传输!
  • 整数分频,奇偶分频。
  • 【BVITS2】配置debug记录——Bert-VITS2-Integration-Pack-v2.0.2
  • 第十节HarmonyOS 使用资源引用类型
  • 在 TypeScript 中,interface、implements 和 extends 的作用
  • WT2003H语音芯片系列:通过bin文件实现板载语音更新,支持宽范围音频码率
  • CC++枚举类型与类型定义(typedef)
  • 【MySql】悲观锁和乐观锁的介绍
  • Micropython for QNX编译过程
  • Linux下配置邮箱客户端MUTT,整合msmtp + procmail + fetchmail
  • idea通过remote远程调试云服务器
  • 2015年五一杯数学建模C题生态文明建设评价问题解题全过程文档及程序
  • 分享一些Git的常用命令
  • 【Python】Gym的使用
  • OpenTelemetry系列 - 第2篇 Java端接入OpenTelemetry
  • ctfhub技能树_web_web前置技能_HTTP
  • Python海绵宝宝
  • 【SpringMVC】Spring Web MVC入门(一)
  • 公平锁和非公平锁以及他们的实现原理是什么
  • react-route-dom 实现简单的嵌套路由
  • linux如何杀死进程_kill
  • 吸积效应:为什么接口会越来越臃肿?我们从一个接口说起