当前位置: 首页 > article >正文

快速入门Zookeeper

Zookeeper

ZooKeeper作为一个强大的开源分布式协调服务,扮演着分布式系统中至关重要的角色。它提供了一个中心化的服务,用于维护配置信息、命名、提供分布式同步以及提供组服务等。通过其高性能和可靠的特性,ZooKeeper能够确保在复杂的分布式环境中,各个节点和服务之间的协调和通信得以顺利进行。无论是在大规模的Web服务、云基础设施管理,还是大数据处理框架中,ZooKeeper都是实现服务发现、负载均衡、数据发布/订阅等关键功能的首选工具。它的出现极大地简化了分布式系统设计中的一致性问题,使得开发者可以更加专注于业务逻辑的实现。

在这里插入图片描述

安装配置

学习笔记:快速入门ZooKeeper技术

zookeeper命令操作

zookeeper数据模型
在这里插入图片描述
服务端命令操作
• 启动 ZooKeeper 服务: ./zkServer.sh start
• 查看 ZooKeeper 服务状态: ./zkServer.sh status
• 停止 ZooKeeper 服务: ./zkServer.sh stop
• 重启 ZooKeeper 服务: ./zkServer.sh restart
客户端命令操作
在这里插入图片描述
连接ZooKeeper服务端:

./zkCli.sh –server ip:port

在这里插入图片描述

断开链接

quit

在这里插入图片描述
查看节点信息

ls /dubbo

在这里插入图片描述
创建节点并赋值(没有加任何参数就是持久化的

create /节点path value

在这里插入图片描述

设置节点值

set /节点path value

在这里插入图片描述
删除节点值

delete /path

在这里插入图片描述
不能重复创建节点,但是可以创建子节点往下延申

create /app1/p1
create /app1/p2

在这里插入图片描述
子节点存在的时候,不允许直接删除父节点
直接删除子节点的节点

deleteall /节点path

在这里插入图片描述
创建临时顺序节点

create -e /节点path value

退出会话,重新打开xshell查询会发现临时节点没有了

创建持久化顺序节点

create -s /节点path value

如果生成多次,会发现顺序产生多个节点
在这里插入图片描述
查看创建顺序临时节点

create -es /节点路径

在这里插入图片描述
在这里插入图片描述

查看节点详细信息

ls -s /节点path(ls2 /)

之前使用过Dubbo,这里可以看一下服务提供方的ip地址
在这里插入图片描述
在这里插入图片描述

JAVA API操作

在这里插入图片描述注意
因为Curator是ZooKeeper的Java客户端库,所以二者的版本要对应起来可以看下官网的说明。
Zookeeper 3.5X 的版本可以用Curator4.0版本
低版本的Curator不能用高版本的Zookeeper,反之则可以。

导入pom.xml

<dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>

        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

建立链接

public class CuratorTest {
    /**
     * 建立连接
     */
    @Test
    public void testConnection() {
        // 1.第一种方式
        // 重试策略,该策略重试设定的次数,每次重试之间的睡眠时间都会增加
        /**
         * 参数:
         * baseSleepTimeMs – 重试之间等待的初始时间量
         * 最大重试 次数 – 重试的最大次数
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

        /**
         * Create a new client
         *
         * @param connectString       list of servers to connect to 连接字符串,地址+端口 例如“192. 168.149.135:2181, 192.168.149.135”
         * @param sessionTimeoutMs    session timeout 会话超时时间 单位毫秒
         * @param connectionTimeoutMs connection timeout 连接超时时间 单位毫秒
         * @param retryPolicy         retry policy to use 策略
         * @return client
         */

        CuratorFramework client =
                CuratorFrameworkFactory.newClient("192. 168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);

        client.start();
    }
}
 /**
     * 建立连接 方式二
     */
    @Test
    public void testConnection2() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

        // 第二种方式
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString("192. 168.149.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("itheima");;
        
        builder.build().start();

    }

注意这里建议把namespace加上,相当于是根目录
在这里插入图片描述
添加节点

public class CuratorTest {
	// 声明为成员变量 
    private CuratorFramework client;
    /**
     * 建立连接 方式二
     */
    @Before
    public void testConnection2() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

        // 第二种方式
        client = CuratorFrameworkFactory.builder().connectString("192. 168.149.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("itheima").build();

        client.start();
    }

    /**
     * 创建节点
     * 持久、临时、顺序
     * 设置数据
     * <p>
     * 1.基本创建
     * 2.创建节点,带有数据
     * 3.设置节点的类型
     * 4.创建多级节点
     */
    @Test
    public void testCreate() throws Exception {
        // client操作
    }

    @After
    /**
     * 关闭连接
     */
    public void close() {
        if (null != client) {
            client.close();
        }
    }
}

(1).创建一个基础的节点
我们先创建一个基础的节点
CuratorTest.java

    @Test
    public void testCreate() throws Exception {
        String path = client.create().forPath("/app4");
        System.out.println(path);
    }

(2).创建一个带有数据的节点
CuratorTest.java

    @Test
    public void testCreateValue() throws Exception {
        String path = client.create().forPath("/app5", "heima".getBytes());
        System.out.println(path);
    }

(3).设置节点的类型
CuratorTest.java
创建临时节点

    /**
     * 设置节点类型
     *
     * @throws Exception
     */
    @Test
    public void testCreateType() throws Exception {
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app6", "number6".getBytes());
        System.out.println(path);
    }

(4).创建多级节点
CuratorTest.java

 /**
     * 多级节点
     *
     * @throws Exception
     */
    @Test
    public void testCreateManyTypes() throws Exception {
        // creatingParentsIfNeeded()如果父节点不存在,就创建一个节点
        String path = client.create().creatingParentsIfNeeded().forPath("/app8/p8", "number8".getBytes());
        System.out.println(path);
    }

查询节点
(1).查询数据
CuratorTest.java

    /**
     * 获取、查询
     * 1.查询数据
     * 2.查询子节点
     * 3.查询节点的状态 ls -s
     */
    @Test
    public void testGet() throws Exception {
        byte[] bytes = client.getData().forPath("/app7");
        System.out.println(new String(bytes));
    }

(2).查询子节点
CuratorTest.java

    /**
     * 获取、查询
     * 2.查询子节点
     */
    @Test
    public void testGetSon() throws Exception {
        List<String> path = client.getChildren().forPath("/app8");
        System.out.println(path);
    }

(3).查询节点的状态 ls -s
CuratorTest.java

    /**
     * 3.查询节点的状态 ls -s
     */
    @Test
    public void testGetStatus() throws Exception {
        Stat stat = new Stat();
        System.out.println("查询前:" + stat);
        client.getData().storingStatIn(stat).forPath("/app8/p8");
        System.out.println("查询后:" + stat);
    }

修改节点
修改数据
修改前用Zookeeper查询

    /**
     * 修改数据
     *
     * @throws Exception
     */
    @Test
    public void testSet() throws Exception {
        client.setData().forPath("/app7", "dong77".getBytes());
    }

根据版本修改数据⭐(推荐)
CuratorTest.java

    /**
     * 按照版本修改
     *
     * @throws Exception
     */
    @Test
    public void testSetForVersion() throws Exception {
        int version = 0;
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/app7");
        version = stat.getVersion();
        System.out.println("当前version是:" + version);
        client.setData().withVersion(version).forPath("/app7", "luka7".getBytes());
    }

删除节点

    /**
     * 1.删除单个节点
     *
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception {
        client.delete().forPath("/app5");
    }

删除带有子节点的节点
删除前节点app7是有子节点的

    /**
     * 2.删除带有子节点的节点
     *
     * @throws Exception
     */
    @Test
    public void testDeleteWithSon() throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath("/app7");
    }

必须成功的删除节点⭐(推荐):防止网络抖动,本质抖动。
删除前app4还在

    /**
     * 3.必须成功的删除节点
     *
     * @throws Exception
     */
    @Test
    public void testDeleteSucceed() throws Exception {
        client.delete().guaranteed().forPath("/app4");
    }

回调
删除前有节点

/**
     * 4.回调
     *
     * @throws Exception
     */
    @Test
    public void testDeleteReturnFun() throws Exception {
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("我被删除了");
                System.out.println(event);
            }
        };
        client.delete().inBackground(callback).forPath("/app8");
    }

Watch事件监听

在这里插入图片描述
(1).NodeCache给指定一个节点注册监听器
用代码实现一下:
命名空间下面可能没有子节点的话,过一会儿就会删除命名空间
CuratorTest.java

 /**
     * NodeCache : 只是监听某一个特定的节点
     *
     * @throws Exception
     */
    @Test
    public void testNodeCache() throws Exception {
        while (true) {
            /**
             * Params:
             * client – curztor client 客户端
             * path – the full path to the node to cache  要缓存的节点的完整路径
             * dataIsCompressed – if true, data in the path is compressed 如果为 true,则路径中的数据被压缩,默认不压缩
             */
            // 1.创建NodeCache对象
            NodeCache nodeCache = new NodeCache(client, "/app1");

            // 2.注册监听8
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("节点变化了");
                }
            });

            // 3.开启监听 如果为 true,将在此方法返回之前调用, rebuild() 以便获得节点的初始视图
            nodeCache.start(true);
        }

    }

现在想知道节点变成了什么,这里需要修改代码
CuratorTest.java

 /**
     * NodeCache : 只是监听某一个特定的节点
     *
     * @throws Exception
     */
    @Test
    public void testNodeCache() throws Exception {

        /**
         * Params:
         * client – curztor client 客户端
         * path – the full path to the node to cache  要缓存的节点的完整路径
         * dataIsCompressed – if true, data in the path is compressed 如果为 true,则路径中的数据被压缩,默认不压缩
         */
        // 1.创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client, "/app1");

        // 2.注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了");
                // 获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println("修改后数据是:" + new String(data));
            }
        });

        // 3.开启监听 如果为 true,将在此方法返回之前调用, rebuild() 以便获得节点的初始视图
        nodeCache.start(true);
        while (true) {
        }


    }

(2).PathChildrenCache(监听某个节点的子节点们 )
测试前先给app1节点创建3个子节点

/**
     * PathChildrenCache : 监控一个ZNode的所有子节点.
     *
     * @throws Exception
     */
    @Test
    public void testPathChildrenCache() throws Exception {
        /**
         *  如果为 true,则除了统计信息之外,还会缓存节点内容
         */
        // 1.创建监听对象
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", true);

        // 2.绑定监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子节点变化了");
                System.out.println(event);
            }
        });
        pathChildrenCache.start();

        while (true) {

        }
    }

那现在我想看到具体子节点的变化情况和变化的值呢。修改CuratorTest.java修改PathChildrenCache构造函数,最后一个值是false,然后输出监听信息。

/**
     * PathChildrenCache : 监控一个ZNode的所有子节点.
     *
     * @throws Exception
     */
    @Test
    public void testPathChildrenCache() throws Exception {
        /**
         *  如果为 true,则除了统计信息之外,还会缓存节点内容
         */
        // 1.创建监听对象
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", false);

        // 2.绑定监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子节点变化了");
                System.out.println(event);

                // 1.获取类型
                PathChildrenCacheEvent.Type type = event.getType();
                // 2.判断类型是否是update
                if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    byte[] bytes = event.getData().getData();
                    System.out.println("数据被更新了:" + new String(bytes));
                } else if (type.equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    byte[] bytes = event.getData().getData();
                    System.out.println("数据被添加了:" + new String(bytes));
                } else if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    byte[] bytes = event.getData().getData();
                    System.out.println("数据被删除了:" + new String(bytes));
                }

            }
        });
        pathChildrenCache.start();

        while (true) {

        }
    }

(3).TreeCache(监听某个节点自己和所有的子节点们)
CuratorTest.java

/**
     * TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
     *
     * @throws Exception
     */
    @Test
    public void testTreeCache() throws Exception {
        TreeCache treeCache = new TreeCache(client, "/app1");
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("整个树节点有变化");
                System.out.println(event);
            }
        });
        treeCache.start();

        while (true) {
            
        }

    }

也想输出树节点的信息,继续修改CuratorTest.java

 /**
     * TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
     *
     * @throws Exception
     */
    @Test
    public void testTreeCache() throws Exception {
        TreeCache treeCache = new TreeCache(client, "/app1");
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("整个树节点有变化");
                System.out.println(event);

                TreeCacheEvent.Type type = event.getType();
                if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)) {
                    byte[] bytes = event.getData().getData();
                    System.out.println("树节点数据被更新了:" + new String(bytes));
                } else if (type.equals(TreeCacheEvent.Type.NODE_ADDED)) {
                    byte[] bytes = event.getData().getData();
                    System.out.println("树节点数据被添加了:" + new String(bytes));
                } else if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
                    byte[] bytes = event.getData().getData();
                    System.out.println("树节点数据被删除了:" + new String(bytes));
                } else {
                    System.out.println("树节点无操作");
                }

            }
        });
        treeCache.start();

        while (true) {

        }

    }

分布式锁

在这里插入图片描述
原理
当客户端想要获得锁,则创建节点,使用完锁,则删除该节点。
在这里插入图片描述
模拟12306售票案例

在这里插入图片描述
在这里插入图片描述

创建一个测试类LockTest.java

package com.itheima;

/**
 * @ClassName: LockTest
 * @Description:
 * @Author: wty
 * @Date: 2023/3/14
 */

public class LockTest {
    public static void main(String[] args) {
        Tick12306 tick12306 = new Tick12306();
        // 创建客户端
        Thread t1 = new Thread(tick12306, "携程");
        Thread t2 = new Thread(tick12306, "飞猪");
        Thread t3 = new Thread(tick12306, "去哪儿");
        
        t1.start();
        t2.start();
        t3.start();
    }
}

创建实体类模拟12306抢票操作

package com.itheima;

/**
 * @ClassName: Tick12306
 * @Description:
 * @Author: wty
 * @Date: 2023/3/14
 */

public class Tick12306 implements Runnable {
    // 数据库的票数
    private int tickets = 100;

    @Override
    public void run() {
        while (true) {
            if (tickets > 0) {
                System.out.println("线程:" + Thread.currentThread().getName() + "买走了票,剩余" + (--tickets));
            }
        }
    }
}

解决方案:用分布式锁解决
添加工具类ClientConnection.java

public class ClientConnection {
    public static CuratorFramework getConnection() {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192. 168.149.135:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000).retryPolicy(retry).build();
        client.start();
        return client;
    }
}

修改Tick12306.java

public class Tick12306 implements Runnable {
    private InterProcessMutex lock;
    // 数据库的票数
    private int tickets = 100;

    public Tick12306() {
        CuratorFramework client = ClientConnection.getConnection();
        lock = new InterProcessMutex(client, "/itheima/lock");
    }

    @Override
    public void run() {
        while (true) {
            // 获取锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets > 0) {
                    System.out.println("线程:" + Thread.currentThread().getName() + "买走了票,剩余" + (--tickets));
                    Thread.sleep(100);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

zookeeper 集群

在这里插入图片描述在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
配置每一个Zookeeper 的dataDir 和 clientPort 分别为2181 2182 2183

修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
模拟集群异常
(1)首先我们先测试如果是从服务器挂掉,会怎么样
把3号服务器停掉,观察1号和2号,发现状态并没有变化。
在这里插入图片描述
在这里插入图片描述

由此得出结论,3个节点的集群,从服务器挂掉,集群正常
(2)我们再把1号服务器(从服务器)也停掉,查看2号(主服务器)的状态,发现已经停止运行了。
在这里插入图片描述
由此得出结论,3个节点的集群,2个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数。
(3)我们再次把1号服务器启动起来,发现2号服务器又开始正常工作了。而且依然是领导者。
在这里插入图片描述
(4)我们把3号服务器也启动起来,把2号服务器停掉,停掉后观察1号和3号的状态。
在这里插入图片描述
(5)我们再次测试,当我们把2号服务器重新启动起来启动后,会发生什么?2号服务器会再次成为新的领导吗?结果2号服务器启动后依然是跟随者(从服务器),3号服务器依然是领导者(主服务器),没有撼动3号服务器的领导地位。由此我们得出结论,当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者

集群角色

在这里插入图片描述


http://www.kler.cn/a/392518.html

相关文章:

  • 详解基于C#开发Windows API的SendMessage方法的鼠标键盘消息发送
  • C#文字识别API场景解析、表格识别提取
  • 2411C++,C++26反射示例
  • -1大于4?负数与无符号整数类型:size_t的比较问题(strlen)
  • leetcode hot100【LeetCode 114.二叉树展开为链表】java实现
  • GaussDB部署架构
  • 机器学习在医疗健康领域的应用
  • 【C++】类与对象的基础概念
  • Python网络爬虫与数据采集实战——什么是网络爬虫
  • PostgreSQL 锁判断
  • 继承和多态(上)
  • Kafka 之自定义消息拦截器【Kafka 拦截器】
  • 牛客小白月赛104-D小红开锁-模拟
  • Unity常见问题合集(一)
  • workerman的安装与使用
  • TCP/IP协议,TCP和UDP区别
  • L10.【LeetCode笔记】回文链表
  • QObject中QThreadData里面的postEventList和QObjectPrivate里面的postedEvents
  • caozha-comment(原生PHP评论系统)
  • 根据模型数据 处理流式数据 生成AI对话
  • [运维][Nginx]Nginx学习(1/5)--Nginx基础
  • QTableWidget的简单使用
  • Swift 开发教程系列 - 第11章:内存管理和 ARC(Automatic Reference Counting)
  • Redhat8.6安装MySQL8.0.31
  • 在启动 Spring Boot 项目时,报找不到 slf4j 的错误
  • openresty入门教程:access_by_lua_block