当前位置: 首页 > article >正文

android接入rocketmq

一 前言

RocketMQ 作为一个功能强大的消息队列系统,不仅支持基本的消息发布与订阅,还提供了顺序消息、延时消息、事务消息等高级功能,适应了复杂的分布式系统需求。其高可用性架构、多副本机制、完善的运维管理工具,以及安全控制功能,使其成为企业级应用的首选消息中间件。
在Android应用中,你可以使用RocketMQ的客户端库来发送和接收消息.

二 接入流程

1 添加依赖

在Android项目的build.gradle文件中添加RocketMQ客户端库的依赖。

dependencies {
    implementation 'org.apache.rocketmq:rocketmq-client:5.3.1'
}
2 添加权限
<uses-permission android:name="android.permission.INTERNET" />
3 接收消息
ExecutorService executor = Executors.newFixedThreadPool(20);  //根据项目需要设置常用线程个数
String TAG = "MainActivity";
String GROUP = "producer";
String ADDRESS = "192.168.1.84:9876";
String KEY = "key";

executor.submit(() -> {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
                consumer.setNamesrvAddr(ADDRESS);
                // 订阅 topic 下的全部 tab
                consumer.subscribe(TOPIC, "*");
                // BROADCASTING:广播模式,把消息发给了所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组, CLUSTERING:集群模式(默认值),每一条消息只会被同一个消费者组中的一个实例消费
                consumer.setMessageModel(MessageModel.CLUSTERING);
                // CONSUME_FROM_LAST_OFFSET:从最新的偏移值开始消费(默认值), CONSUME_FROM_FIRST_OFFSET:从队列最开始的偏移值开始消费, CONSUME_FROM_TIMESTAMP:从指定的时间戳处开始消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                // yyyyMMddHHmmss: 当选择从指定的时间戳处开始消费时, 需要指定该时间戳
                // consumer.setConsumeTimestamp("");
                // 使用并发方式从多个MessageQueue中取数据的方式监听
                consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                    System.out.println();
                    for (MessageExt msg : msgs) {
                        Log.e(TAG,"收到消息:"+new String(msg.getBody()));
                    }
                    // 返回消费成功, 还可以是 RECONSUME_LATER:稍后重新消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
                TimeUnit.DAYS.sleep(1);
            } catch (Throwable cause) {
                cause.printStackTrace();
            }
        });
4 发送消息
 executor.submit(() -> {
            try {
                DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
                producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
                producer.start();

                // 同步传递消息,消息会发给集群中的一个Broker节点。
                Message message = new Message(TOPIC, TAG, KEY, "android  hello word ss".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult result = producer.send(message);

                Log.e(TAG,"发送消息结果:result:"+ JSON.toJSONString(result));
                producer.shutdown();
            } catch (Exception e) {
                Log.e(TAG,"发送失败:"+e.getCause().toString());
                e.printStackTrace();
            }
        });

三 问题

启动项目,点击发送消息,项目报了异常信息,如下

java.lang.NoClassDefFoundError: Failed resolution of: Ljava/lang/management/ManagementFactory;  报错

这是因为RocketMQ客户端库依赖于Java标准库中的 java.lang.management.ManagementFactory 类,而Android并不完全支持Java标准库,尤其是 java.lang.management 包。

RocketMQ官方没有专门为Android提供适配版本,所以可以尝试使用这些版本,或者自己修改RocketMQ源码,移除对 ManagementFactory 的依赖。

四 修改源码

在github中,把rocketmq-client源码下载到本地
https://github.com/apache/rocketmq
在这里插入图片描述

导入到本地如下
在这里插入图片描述
然后找到前面ManagementFactory 报错的地方,将它移除或者用其他方法代替,经排查在
org.apache.rocketmq.common.UtilAll 有相关的引用
在这里插入图片描述
该方法则是为了通过获取jvm的进程ID,这边我们可以把它注释掉,然后用个固定值代替试下

 static {
        HEX_ARRAY = "0123456789ABCDEF".toCharArray();
       /* Supplier<Integer> supplier = () -> {
            // format: "pid@hostname"
            String currentJVM = ManagementFactory.getRuntimeMXBean().getName();
            try {
                return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf('@')));
            } catch (Exception e) {
                return -1;
            }
        };
        PID = supplier.get();*/
        PID = 888888;
    }

以及在org.apache.rocketmq.common.MixAll也有ManagementFactory相关引用,这个作用是获取当前java虚拟机(JVM)的进程ID,可以将其注释,然后返回固定的结果

public static long getPID() {
      String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
        if (StringUtils.isNotEmpty(processName)) {
            try {
                return Long.parseLong(processName.split("@")[0]);
            } catch (Exception e) {
                return 0;
            }
        }
        return 0;
    }

最后还有一个地方有涉及到,在包路径org.apache.rocketmq.store.StoreUtil,其作用是为了获取当前机器的总物理内存大小(以字节为单位)

 public static long getTotalPhysicalMemorySize() {
       long physicalTotal = 1024 * 1024 * 1024 * 24L;
        OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();
        if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
            physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();
        }
        return physicalTotal;
    }

将相关的包修改后,然后将其重新打包,在maven工具下,选择rocketmq-common,选择Plugins下的jar组件,选中下面的jar进行打包
在这里插入图片描述
打包完成后,在模块的target目录下生成jar包
在这里插入图片描述
android需要用到的包如下:

implementation files('libs\\rocketmq-remoting-5.3.1.jar')
implementation files('libs\\rocketmq-client-5.3.1.jar')
implementation files('libs\\rocketmq-common-5.3.1.jar')

implementation 'io.github.aliyunmq:rocketmq-logback-classic:1.0.1'
implementation 'com.google.guava:guava:31.1-jre'
implementation 'commons-validator:commons-validator:1.7'

将模块rocketmq-remoting,rocketmq-client,rocketmq-commo三个模块重新打包后导入,然后再加上下面那三个相关联的依赖包.重新用android应用进行收发信息,测试如下:

2025-03-04 14:51:22.272 13676-13795/? E/MainActivity: 收到消息:android  hello word ss
2025-03-04 14:51:22.279 13676-13785/? E/MainActivity: 发送消息结果:result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"TopicTestLss"},"msgId":"C10005FD90380CA347BF12A326F00000","offsetMsgId":"C2000B5400002A9F000000000007A575","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true,"transactionId":"C10005FD90380CA347BF12A326F00000"}


http://www.kler.cn/a/573779.html

相关文章:

  • FastGPT 源码:混合检索调用链路
  • PHP fastadmin 学习
  • [杂学笔记]HTTP1.0和HTTP1.1区别、socket系列接口与TCP协议、传输长数据的时候考虑网络问题、慢查询如何优化、C++的垃圾回收机制
  • MQ保证消息的顺序性
  • 如何在Windows下离线部署DeepSeek并以WebApi形式调用
  • Golang的代码注释规范指南
  • Macro Bullion:国际金价回调态势下,金市表现与金银交易建议
  • 数据结构与算法:回溯(下):子集相关力扣题(78.子集、90.子集Ⅱ、491.非递减子序列)、排列相关力扣题(46.全排列、47.全排列Ⅱ)
  • es如何进行refresh?
  • 鸿蒙NEXT开发-端云一体化开发
  • Python 爬取唐诗宋词三百首
  • 由麻省理工学院计算机科学与人工智能实验室等机构创建低成本、高效率的物理驱动数据生成框架,助力接触丰富的机器人操作任务
  • 闭包:前端开发的“记忆胶囊“与Vue框架的“隐身特工“
  • ANI AGI ASI的区别
  • python学习第三天
  • C# Enumerable类 之 数据(类型)转换
  • 【菜笔cf刷题日常-1600】C. Balanced Stone Heaps(二分求min/max)
  • 整除分块 2025牛客寒假算法基础集训营3G
  • Kotlin 协程(三)协程的常用关键字使用及其比较
  • Visual Stdio 2022 C#调用DeepSeek API