当前位置: 首页 > article >正文

【RocketMQ】RocketMQ 5.1.0版本Proxy集群模式部署实践

为了支持长远的云原生发展,RocketMQ引入了一个全新的模块:Proxy,官方对RocketMQ客户端提供了独立的开源项目:https://github.com/apache/rocketmq-clients,如果要使用这个新的客户端,必须要使用Proxy作为endpoint。

Proxy有两种搭建方式:

  • LOCAL:本地模式,顾名思义,通过追加参数,在broker本地启动
  • CLUSTER:集群模式,作为独立的集群启动,搭建完nameserver和broker后,独立部署

本地模式更适合非正式的场景,如调试、开发,线上环境还是推荐集群模式,本文基于集群模式进行部署验证。

启动proxy使用mqproxy命令:

[root@XXGL-T-TJSYZ-REDIS-01 bin]# ./mqproxy -help
usage: mqproxy [-bc <arg>] [-h] [-n <arg>] [-pc <arg>] [-pm <arg>]
 -bc,--brokerConfigPath <arg>   Broker config file path for local mode
 -h,--help                      Print help
 -n,--namesrvAddr <arg>         Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'
 -pc,--proxyConfigPath <arg>    Proxy config file path
 -pm,--proxyMode <arg>          Proxy run in local or cluster mode
[root@XXGL-T-TJSYZ-REDIS-01 bin]# 
  • -bc:使用本地模式时,指定broker的配置文件路径
  • -h:输出帮助信息;
  • -n:nameserver路径,也可以通过在配置文件中配置namesrvAddr指定;
  • -pc:proxy配置文件路径;
  • -pm:代理模式:LOCAL / CLUSTER,默认为CLUSTER(集群模式)

需要编辑的文件有:

  • bin/runserver.sh:修改GC日志目录和JVM参数;(非必须)
  • conf/rmq-proxy.json:
    主要是设置集群名、自定义端口
{
  "rocketMQClusterName": "littleCat",
  "remotingListenPort":28080,
  "grpcServerPort":28081
}

完整参数见源码:org.apache.rocketmq.proxy.config.ProxyConfig

启动脚本:

#!/bin/bash
. /etc/profile

nohup sh /neworiental/rocketmq-5.1.0/rocketmq-proxy/bin/mqproxy -n '172.24.30.192:19876;172.24.30.193:19876;172.24.30.194:19876' -pc /neworiental/rocketmq-5.1.0/rocketmq-proxy/conf/rmq-proxy.json >/dev/null 2>&1 &
echo "deploying rocketmq-proxy..."

停止脚本:

#!/bin/bash
. /etc/profile

PID=`ps -ef | grep '/neworiental/rocketmq-5.1.0/rocketmq-proxy' | grep -v grep | awk '{print $2}'`
if [[ "" !=  "$PID" ]]; then
  echo "killing rocketmq-proxy : $PID"
  kill $PID
fi

启动成功

客户端测试:

pom依赖:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.4</version>
        </dependency>

生产者:

package cn.xdf.xadd.rmq.test.newclient;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ProducerNormalMessageExample {

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String endpoints = "172.24.30.192:28080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String topic = "zhurunhua-test";
        // In most case, you don't need to create too many producers, singleton pattern is recommended.
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic
                // route before message publishing.
                .setTopics(topic)
                // May throw {@link ClientException} if the producer is not initialized.
                .build();
        // Define your message body.
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "new-client-test";
        final Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys("test")
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
        } catch (Throwable t) {
            System.err.println(t);
        }
        // Close the producer when you don't need it anymore.
        producer.close();
    }
}

消费者:

package cn.xdf.xadd.rmq.test.newclient;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class);

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String endpoints = "172.24.30.192:28080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "new-client-test";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "new-client-test-group";
        String topic = "zhurunhua-test";
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Set the consumer group name.
                .setConsumerGroup(consumerGroup)
                // Set the subscription for the consumer.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    // Handle the received message and return consume result.
                    log.info("Consume message={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // Block the main thread, no need for production environment.
        Thread.sleep(Long.MAX_VALUE);
        // Close the push consumer when you don't need it anymore.
        pushConsumer.close();
    }
}

以上,搭建成功,消息生产消费成功~

遇到的问题:

找到rmq.proxy,logback.xml,批量修改对应的路径:

将${brokerLogDir}批量替换成自定义的路径即可。

dashboard目前还不能采集到客户端的信息:


http://www.kler.cn/a/4959.html

相关文章:

  • Ubuntu | PostgreSQL | 解决 ERROR: `xmllint` is missing on your system.
  • 12_Redis发布订阅
  • 【数据链电台】洛克希德·马丁(Lockheed Martin)
  • 【C++】C++11(二)
  • java中json字符串键值获取
  • 微信小程序实现长按录音,点击播放等功能,CSS实现语音录制动画效果
  • 【共创共赢】AntDB数据库合作伙伴交流会(北京站)顺利举办
  • NVT | NT96660 NVTIPC库应用说明
  • GO实现Redis:GO实现Redis的AOF持久化(4)
  • Ubuntu22.04部署Kubernetes集群(亲测可用)
  • GROUP_CONCAT的进阶使用
  • TryHackMe-Madeye‘s Castle(boot2root)
  • 基于springboot和vue实现地方美食分享网站演示【附项目源码】分享
  • 2023版信息系统项目管理师考试大纲
  • Python 装饰器
  • 基于DDS的SOA测试方案实现
  • mmdetection3d可视化多模态模型推理结果
  • 瑞萨G2UL工业核心板内存测试,您想了解的内容全都有
  • Linux介绍
  • LLVM 的中间代码(IR) 基本语法
  • 20230327----重返学习-轮播图-function的ES6变量提升问题
  • WebKitX ActiveX 6.0 X86 Crack
  • 隐私计算具体用java 如何实现
  • Nginx学习(11)—— Nginx源码架构、configure是怎么执行的(编译的具体细节)
  • 大学计划|关于举办《数字化转型赋能教育创新发展高峰论坛》的通知
  • 第二个项目 基于React技术学习的pc端项目