当前位置: 首页 > article >正文

RocketMQ中的NameServer主要数据结构

1.前言

NameServer是RocketMQ中的一个比较重要的组件,我们这篇博客针对NameSever中包含的组件进行分析,分析一下NameServer中包含的组件以及组件的作用。以前我有一篇博客中rocketMq源码分析之搭建本地环境-CSDN博客,在这篇博客中就简单看了下NameSever中会有两个组件:NamesrvConfig和NettyServerConfig。在这里就不在进行介绍。

2.KVConfigManager

这个KVConfigManager的核心作用是提供一个统一的地方来存储和管理一些全局的配置信息,这些配置信息以键值对的形式存在,不同的业务模块或者不同的命名空间可以使用这些配置来完成特定的任务。

private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
        new HashMap<String, HashMap<String, String>>();

 

里面存放了一个HashMap用来存放一个一个键值对。主要包含putKVConfig方法,getKVConfig方法以及persist(持久化到磁盘)方法。

3.RouteInfoManager

这个RouteInfoManager是NameServer中最重要的一个组件,主要负责管理和维护整个 RocketMQ 集群的路由信息,核心作用是维护 RocketMQ 集群中各种组件(如 Broker、Topic 等)的路由元数据信息,使得生产者和消费者能够准确地找到所需的消息队列,从而实现消息的正确发送和消费。

下面是RouteInfoManager中的主要数据结构:

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    // broker长连接过期时间 长连接的空闲时间是2分钟
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    //读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // 创建topic 以后 topic是逻辑上的概念 一个topic会有多个Queue Queue会分散到不同的broker上
    private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
    //  代表的broker组的信息 BrokerData包含了一组Broker的信息
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // 一个NameServer可以管理多个broker组 通常来说一个Cluster就可以了
    // 有可能会有很多复杂的业务场景 多个Cluster
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //管理Broker的长连接心跳 是否还有心跳
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // Filter Server 是rocketMQ的一个高级功能,用来过滤消息
    //一般情况下 我们是可以基于tag进行数据筛选的操作,比较简单,没有办法进行更加细化的过滤
    //这个Filter Server是在每台Broker机器上启动一个(或者多个)Filter Server
    //我们可以把一个自定义的消息筛选的class 上传到Filter server上,在进行数据消费的时候让Broker把数据先传输到Filter Server
    // Filter Server会根据你自定义的class来进行细粒度的数据筛选,把筛选后的数据回传给你的消费端
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

这个RouteInfoManager会有一些比较核心的方法,

1.getAllClusterInfo 获取集群的信息

    /**
     * 返回的是 broker的cluster信息
     * 里面包含的是HashMap<String //brokerName//  BrokerData> brokerAddrTable
     * HashMap<String  //clusterName// , Set<String //brokerName// >> clusterAddrTable
     * @return
     */
    public ClusterInfo getAllClusterInfo() {
        ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
        clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
        clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
        return clusterInfoSerializeWrapper;
    }

2.deleteTopic 删除对应的topic信息


    /**
     * 删除某个topic 直接操作topicQueueTable的hashMap
     * @param topic
     */
    public void deleteTopic(final String topic) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                this.topicQueueTable.remove(topic);
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("deleteTopic Exception", e);
        }
    }

3.getAllTopicList 查看所有的topic信息

/**
     * 查询所有的topic的列表信息
     * @return
     */
    public TopicList getAllTopicList() {
        TopicList topicList = new TopicList();
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                topicList.getTopicList().addAll(this.topicQueueTable.keySet());
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("getAllTopicList Exception", e);
        }

        return topicList;
    }

4.registerBroker 注册broker方法

  /**
     * broker的注册方法
     * @param clusterName broker的集群名称
     * @param brokerAddr broker的地址
     * @param brokerName broker所属组的名称
     * @param brokerId   broker机器的id
     * @param haServerAddr broker的ha地址
     * @param topicConfigWrapper 当前broker机器上包含的topic队列上的数据
     * @param filterServerList broker上部署的filterServer的列表
     * @param channel netty的网络长连接
     * @return broker注册的结果
     */
    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
  
  

4.总结

老规矩,我们用一张图来进行总结一下这个文章的核心内容。


http://www.kler.cn/a/530048.html

相关文章:

  • 用一个例子详细说明python单例模式
  • 基于VMware的ubuntu与vscode建立ssh连接
  • [LeetCode]day10 707.设计链表
  • 基于微信小程序的医院预约挂号系统设计与实现(LW+源码+讲解)
  • 告别复杂,拥抱简洁:用plusDays(7)代替plus(7, ChronoUnit.DAYS)
  • lstm代码解析1.2
  • 网站快速收录:利用网站作者信息提升权重
  • ROS-SLAM
  • DeepSeek-R1模型1.5b、7b、8b、14b、32b、70b和671b有啥区别?
  • 25.2.2学习内容
  • C++11新特性之范围for循环
  • 使用 HTTP::Server::Simple 实现轻量级 HTTP 服务器
  • kamailio-kamctl monitor解释
  • 面经--C语言——sizeof和strlen,数组和链表,#include <>和 #include ““ #define 和typedef 内存对齐概述
  • Pluto固件编译笔记
  • C#,shell32 + 调用控制面板项(.Cpl)实现“新建快捷方式对话框”(全网首发)
  • Rust 函数使用详解
  • solidity高阶 -- Eth支付
  • 快速提升网站收录:利用网站用户反馈机制
  • Python函数基础
  • 系统思考—决策
  • Python 深拷贝与浅拷贝:数据复制的奥秘及回溯算法中的应用
  • deepseek+vscode自动化测试脚本生成
  • Error: Expected a mutable image
  • C++:抽象类习题
  • 用Python替代OpenMV IDE显示openmv USB 图像