当前位置: 首页 > article >正文

zk之数据的发布与订阅

数据的发布和订阅:

(1)数据的发布与订阅是一个一对多的关系。多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使它们能够自动的更新自己的状态。发布和订阅可以让发布方和订阅放独立封装。
(2)当一个对象改变的时候,需要同时改变其它的对象,而且不知道多少个对象需要改变时,那么就可以使用发布和订阅模式。数据的发布和订阅在分布式中的应用主要有配置管理,和服务发现。
(3)配置管理是指如果集群中的机器拥有某些相同的配置时,并且这些配置信息需要动态的改变,我们就可以使用数据的发布和订阅模式把配置做统一的管理。让这些机器各自订阅配置信息的改变。当配置发生改变的时候,这些机器就可以得到通知。并且更新为最新的配置。
(4)服务发现是指,对集群中的服务上下线做统一的管理,每台服务器都可以作为数据的发布方,向集群注册自己的基本信息。而让某些监控服务器作为订阅工作服务器的基本信息。
(5)当工作服务器的基本信息发生改变的时候,比如说上下线,服务器角色改变,服务范围的变更,那么监控服务器可以得到通知,并且响应这些变化。

基本模型如下

​​​​​​​​​​​​在这里插入图片描述
左侧浅紫色的区域代表的是zk集群,右侧的方块代表的是工作服务器集群。其中,前3个方块代表的是工作服务器。绿色的方块代表的是管理服务器。最下面的方块代表的是控制服务器。Zk中有三类的节点,首先是config节点,它用于我们的配置管理,manageServer可以通过config来下发配置信息。workServer可以通过订阅config来改变更新自己的配置信息。

Servers节点用于服务发现,每个workServer在启动的时候,都会在Servers下创建一个临时节点,manager节点充当的monitor,监控servers节点下的子节点的改变,来更新工作服务器的列表信息。最后我们可以通过control Server,由command节点作为中介向manageServer发送控制指令。controlServer向command节点写入控制信息,manageServer订阅command节点的数据改变,来监听并且执行命令。

代码基本流程图

1.manage server 程序主体工作流程

在这里插入图片描述

2.work server 程序主体流程

在这里插入图片描述

3.系统核心类基本模型

在这里插入图片描述
serverConfig 用来记录workServer的配置信息
serverData 用来记录workServer的基本信息
subscribeZkClient 作为整个类的入口,用来启动workServer和manageServer.
demo 如下:

/**
 * 下面demo就是一个典型的发布订阅系统:
 * 集群中每台机器在启动阶段,都会到该节点上获取数据库的配置信息,同时客户端还需要在在节
 * 点注册一个数据变更的watcher监听,一旦该数据节点发生变更,就会受到通知信息。
 */
public class ConfigTest {
    /**
     * 配置中心父节点
     */
    private static final String PATH = "/server/database_config";
    private static final String zkAddress = "127.0.0.1:2181";
    private static final int timeout = 1000;

    private static CuratorFramework client = null;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 客户端的连接状态监听
     */
    static ConnectionStateListener clientListener = new ConnectionStateListener() {

        public void stateChanged(CuratorFramework client,
                                 ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                System.out.println("connected established");
                countDownLatch.countDown();
            } else if (newState == ConnectionState.LOST) {
                System.out.println("connection lost,waiting for reconection");
                try {
                    System.out.println("reinit---");
                    reinit();
                    System.out.println("inited---");
                } catch (Exception e) {
                    System.err.println("re-inited failed");
                }
            } else if (newState == ConnectionState.SUSPENDED) {
                System.out.println("suspended");
            } else {
                System.out.println(newState);
            }

        }
    };

    public static void main(String[] args) throws Exception {
        //1、初始化curator
        init();
        //2、判断父节点是否存在,不在的话创建该节点(这个节点本身应在服务器手动添加的)
        Stat stat = client.checkExists().forPath(PATH);
        if (stat == null) {
            client.create().creatingParentsIfNeeded().forPath(PATH);
        }
        //3、对path的变更进行监听
        watcherPath(PATH, pathWatcher);
        //4、模拟阻塞场景,可以客户端改变数据测试
        Thread.sleep(Integer.MAX_VALUE);
    }


    public static void init() throws Exception {
        client = CuratorFrameworkFactory.builder().connectString(zkAddress)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new RetryNTimes(5, 5000)).build();
        // 客户端注册连接状态监听器,进行连接配置(客户端连接的状态会被相应 的监听器监听)
        client.getConnectionStateListenable().addListener(clientListener);
        client.start();
        // 连接成功后,才进行下一步的操作(连接成功会触发监听器中的countDownLatch.await())
        countDownLatch.await();
    }

    public static void reinit() {
        try {
            unregister();
            init();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    public static void unregister() {
        try {
            if (client != null) {
                client.close();
                client = null;
            }
        } catch (Exception e) {
            System.out.println("unregister failed");
        }
    }

    /**
     * 对path进行监听配置
     *
     * @param path
     * @param watcher
     * @return
     * @throws Exception
     */
    public static String watcherPath(String path, CuratorWatcher watcher)
            throws Exception {
        //只是改变数据时候会反应(或者是使用getChildren、exist。或者是在创建客户端构造函数进行watcher监听)
        byte[] buffer = client.getData().usingWatcher(watcher).forPath(path);

        System.out.println("获取节点的信息:" + new String(buffer));
        return new String(buffer);
    }

    /**
     * 读取path数据
     *
     * @param path
     * @return
     * @throws Exception
     */
    public static String readPath(String path) throws Exception {
        byte[] buffer = client.getData().forPath(path);
        return new String(buffer);

    }

    /**
     *
     * 对path进行改变监听的watcher
     */
    private static CuratorWatcher pathWatcher = new CuratorWatcher() {
        public void process(WatchedEvent event) throws Exception {
            System.out.println(event.getType());
            // 当数据变化后,重新获取数据信息
            if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                //获取更改后的数据,进行相应的业务处理
                String value = readPath(event.getPath());
                System.out.println(value);
            }

        }
    };

}```


http://www.kler.cn/a/17526.html

相关文章:

  • 大数据 ETL + Flume 数据清洗 — 详细教程及实例(附常见问题及解决方案)
  • Java复习42(PTA)
  • C# 集合与泛型
  • C++开发基础之使用librabbitmq库实现RabbitMQ消息队列通信
  • 智慧仓储物流可视化平台
  • FMC 扩展子卡6 路 422,8 组 LVDS,8 路 GPIO
  • RBF-UKF径向基神经网络结合无迹卡尔曼滤波估计锂离子电池SOC(附MATLAB代码)RBF神经网络训练部分
  • 有趣的回文检测
  • 如何用ChatGPT做团队绩效管理?根据员工的个人优势、不足、目标来生成更具体的绩效反馈
  • 2023/5/7周报
  • 猫狗训练集训练报错:Failed to find data adapter that can handle input
  • C++ Primer第五版_第十五章习题答案(21~30)
  • Spring Boot 整合 Swagger 教程详解
  • ChatGPT写文章效果-ChatGPT写文章原创
  • leetcode 530. 二叉搜索树的最小绝对差
  • 《互联网安全产品漏洞管理规定》
  • 【Linux Network】网络编程套接字
  • 轻松掌握在已有K8s环境上安装KubeSphere
  • 【五一创作】Qt quick基础1(包含基本元素Text Image Rectangle的使用)
  • HTTP加密
  • 身份鉴别解读与技术实现分析(1)
  • 【Linux】多路转接--select、poll、epoll,非阻塞等待
  • 超大excel文件读,避免内存溢出
  • 【华为OD机试真题 Python】简单的解压缩算法 (100%通过)
  • node之Express
  • 【GAMES101】05 Rasterization(Triangles)