android接入rocketmq
一 前言
RocketMQ 作为一个功能强大的消息队列系统,不仅支持基本的消息发布与订阅,还提供了顺序消息、延时消息、事务消息等高级功能,适应了复杂的分布式系统需求。其高可用性架构、多副本机制、完善的运维管理工具,以及安全控制功能,使其成为企业级应用的首选消息中间件。
在Android应用中,你可以使用RocketMQ的客户端库来发送和接收消息.
二 接入流程
1 添加依赖
在Android项目的build.gradle文件中添加RocketMQ客户端库的依赖。
dependencies {
implementation 'org.apache.rocketmq:rocketmq-client:5.3.1'
}
2 添加权限
<uses-permission android:name="android.permission.INTERNET" />
3 接收消息
ExecutorService executor = Executors.newFixedThreadPool(20); //根据项目需要设置常用线程个数
String TAG = "MainActivity";
String GROUP = "producer";
String ADDRESS = "192.168.1.84:9876";
String KEY = "key";
executor.submit(() -> {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
consumer.setNamesrvAddr(ADDRESS);
// 订阅 topic 下的全部 tab
consumer.subscribe(TOPIC, "*");
// BROADCASTING:广播模式,把消息发给了所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组, CLUSTERING:集群模式(默认值),每一条消息只会被同一个消费者组中的一个实例消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// CONSUME_FROM_LAST_OFFSET:从最新的偏移值开始消费(默认值), CONSUME_FROM_FIRST_OFFSET:从队列最开始的偏移值开始消费, CONSUME_FROM_TIMESTAMP:从指定的时间戳处开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// yyyyMMddHHmmss: 当选择从指定的时间戳处开始消费时, 需要指定该时间戳
// consumer.setConsumeTimestamp("");
// 使用并发方式从多个MessageQueue中取数据的方式监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println();
for (MessageExt msg : msgs) {
Log.e(TAG,"收到消息:"+new String(msg.getBody()));
}
// 返回消费成功, 还可以是 RECONSUME_LATER:稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
TimeUnit.DAYS.sleep(1);
} catch (Throwable cause) {
cause.printStackTrace();
}
});
4 发送消息
executor.submit(() -> {
try {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
// 同步传递消息,消息会发给集群中的一个Broker节点。
Message message = new Message(TOPIC, TAG, KEY, "android hello word ss".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
Log.e(TAG,"发送消息结果:result:"+ JSON.toJSONString(result));
producer.shutdown();
} catch (Exception e) {
Log.e(TAG,"发送失败:"+e.getCause().toString());
e.printStackTrace();
}
});
三 问题
启动项目,点击发送消息,项目报了异常信息,如下
java.lang.NoClassDefFoundError: Failed resolution of: Ljava/lang/management/ManagementFactory; 报错
这是因为RocketMQ客户端库依赖于Java标准库中的 java.lang.management.ManagementFactory 类,而Android并不完全支持Java标准库,尤其是 java.lang.management 包。
RocketMQ官方没有专门为Android提供适配版本,所以可以尝试使用这些版本,或者自己修改RocketMQ源码,移除对 ManagementFactory 的依赖。
四 修改源码
在github中,把rocketmq-client源码下载到本地
https://github.com/apache/rocketmq
导入到本地如下
然后找到前面ManagementFactory 报错的地方,将它移除或者用其他方法代替,经排查在
org.apache.rocketmq.common.UtilAll 有相关的引用
该方法则是为了通过获取jvm的进程ID,这边我们可以把它注释掉,然后用个固定值代替试下
static {
HEX_ARRAY = "0123456789ABCDEF".toCharArray();
/* Supplier<Integer> supplier = () -> {
// format: "pid@hostname"
String currentJVM = ManagementFactory.getRuntimeMXBean().getName();
try {
return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf('@')));
} catch (Exception e) {
return -1;
}
};
PID = supplier.get();*/
PID = 888888;
}
以及在org.apache.rocketmq.common.MixAll也有ManagementFactory相关引用,这个作用是获取当前java虚拟机(JVM)的进程ID,可以将其注释,然后返回固定的结果
public static long getPID() {
String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
if (StringUtils.isNotEmpty(processName)) {
try {
return Long.parseLong(processName.split("@")[0]);
} catch (Exception e) {
return 0;
}
}
return 0;
}
最后还有一个地方有涉及到,在包路径org.apache.rocketmq.store.StoreUtil,其作用是为了获取当前机器的总物理内存大小(以字节为单位)
public static long getTotalPhysicalMemorySize() {
long physicalTotal = 1024 * 1024 * 1024 * 24L;
OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();
if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();
}
return physicalTotal;
}
将相关的包修改后,然后将其重新打包,在maven工具下,选择rocketmq-common,选择Plugins下的jar组件,选中下面的jar进行打包
打包完成后,在模块的target目录下生成jar包
android需要用到的包如下:
implementation files('libs\\rocketmq-remoting-5.3.1.jar')
implementation files('libs\\rocketmq-client-5.3.1.jar')
implementation files('libs\\rocketmq-common-5.3.1.jar')
implementation 'io.github.aliyunmq:rocketmq-logback-classic:1.0.1'
implementation 'com.google.guava:guava:31.1-jre'
implementation 'commons-validator:commons-validator:1.7'
将模块rocketmq-remoting,rocketmq-client,rocketmq-commo三个模块重新打包后导入,然后再加上下面那三个相关联的依赖包.重新用android应用进行收发信息,测试如下:
2025-03-04 14:51:22.272 13676-13795/? E/MainActivity: 收到消息:android hello word ss
2025-03-04 14:51:22.279 13676-13785/? E/MainActivity: 发送消息结果:result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"TopicTestLss"},"msgId":"C10005FD90380CA347BF12A326F00000","offsetMsgId":"C2000B5400002A9F000000000007A575","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true,"transactionId":"C10005FD90380CA347BF12A326F00000"}