当前位置: 首页 > article >正文

Java代码操作Zookeeper(使用 Apache Curator 库)

1. Zookeeper原生客户端库存在的缺点

  • 复杂性高:原生客户端库提供了底层的 API,需要开发者手动处理很多细节,如连接管理、会话管理、异常处理等。这增加了开发的复杂性,容易出错。
  • 连接管理繁琐:使用原生客户端库时,开发者需要手动管理与 ZooKeeper 的连接。这包括连接的建立、重连、会话超时处理等。
  • 异常处理复杂:原生客户端库的 API 抛出多种异常,如 KeeperExceptionInterruptedException 等。开发者需要手动处理这些异常,增加了代码的复杂性。
  • 缺少高级功能:原生客户端库缺少一些高级功能,如连接池管理、自动重试、负载均衡等。这些功能在实际应用中非常有用,但需要开发者自己实现或使用第三方库。
  • 缺少封装和抽象:原生客户端库提供了底层的 API,缺少更高层次的封装和抽象。开发者需要自己编写大量的代码来实现常见的功能,如分布式锁、配置管理等。
  • 性能调优困难:原生客户端库的性能调优需要开发者手动进行,如调整连接超时时间、会话超时时间等。这需要对 ZooKeeper 的工作原理有深入的理解。
  • 缺少社区支持:相比于一些更高级的客户端库(如 Curator),原生客户端库的社区支持相对较少。开发者在使用过程中遇到问题时,可能难以找到解决方案。

2. Apache Curator介绍

在这里插入图片描述

2.1 基本概述
  • 定义:Apache Curator是专为Apache ZooKeeper设计的Java/JVM客户端库,通过提供高级API框架及一系列实用工具,大幅降低使用ZooKeeper的复杂度并提升应用的可靠性。
  • 开发背景:Curator最初由Netflix公司开源,目前是Apache的顶级项目。
2.2 核心功能
  1. 高可用性连接管理:自动处理与ZooKeeper服务器的连接断开和重新连接,确保连接的稳定性和可靠性。
  2. 易于使用的API:封装复杂的ZooKeeper原语,提供更直观、简洁的使用方式,降低开发难度。
  3. 模式(Recipes):预置了一系列常见的分布式计算模式,如leader选举、分布式锁、缓存机制等,开发者可以快速实现这些分布式系统经典难题。
  4. 服务发现与负载均衡:支持动态的服务注册与发现,便于构建云原生应用,提高系统的可扩展性和灵活性。
  5. 异步DSL:针对Java 8及以上版本提供了异步编程的支持,提高了响应速度和程序效率。

3. 使用指南

3.1 添加 Maven 依赖
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
3.2 创建 Curator 客户端
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorExample {
    public static void main(String[] args) throws Exception {
        String connectString = "192.168.200.138:2181";
        String path = "/curator1";
        byte[] data = "myData".getBytes();

        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(5000, 10);
        // 创建 Curator 客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);

        // 启动客户端
        client.start();

        // 创建节点
        client.create().withMode(CreateMode.PERSISTENT).forPath(path, data);

        // 获取节点数据
        byte[] retrievedData = client.getData().forPath(path);
        System.out.println("Retrieved data: " + new String(retrievedData));

        // 关闭客户端
        client.close();
    }
}
3.3 增删改查操作及Watcher监听
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class CuratorExample {
    public static void main(String[] args) throws Exception {
        String connectString = "192.168.200.138:2181";
        String path = "/curator1";
        byte[] data1 = "myData1".getBytes();
        byte[] data2 = "myData2".getBytes();

        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(5000, 10);
        // 创建 Curator 客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);

        // 启动客户端
        client.start();

        // 增加临时监听操作
        client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event) -> {
            switch (event.getType()) {
                case WATCHED:
                    WatchedEvent watchedEvent = event.getWatchedEvent();
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                        System.out.println("监听的数据变化为: " + new String(c.getData().forPath(path)));
                        System.out.println("触发事件");
                    }
            }
        });

        // 创建节点
        client.create().withMode(CreateMode.PERSISTENT).forPath(path, data1);

        // 获取节点数据
        byte[] retrievedData = client.getData().watched().forPath(path);
        System.out.println("原始数据: " + new String(retrievedData));

        // 修改节点数据
        client.setData().forPath(path, data2);
        Thread.sleep(2000);

        // 删除节点
        client.delete().forPath(path);
        Thread.sleep(2000);
    }
}
3.4 进行永久监听
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class PermanentWatcherExample {
    public static void main(String[] args) throws Exception {
        String connectString = "192.168.200.138:2181";
        String path = "/curator1";
        byte[] data1 = "myData1".getBytes();
        byte[] data2 = "myData2".getBytes();
        byte[] data3 = "myData3".getBytes();

        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(5000, 10);
        // 创建 Curator 客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);

        // 启动客户端
        client.start();

        // 永久监听
        client.create().withMode(CreateMode.PERSISTENT).forPath(path, data1);
        NodeCache nodeCache = new NodeCache(client, path);
        nodeCache.start();
        nodeCache.getListenable().addListener(() -> {
            ChildData currentData = nodeCache.getCurrentData();
            if (currentData != null) {
                System.out.println("触发了永久监听的回调,当前值为:" + new String(currentData.getData()));
            }
        });

        client.setData().forPath(path, data1);
        Thread.sleep(2000);

        client.setData().forPath(path, data2);
        Thread.sleep(2000);

        client.setData().forPath(path, data3);
        Thread.sleep(2000);

        client.delete().forPath(path);
    }
}
3.5 使用分布式锁
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class DistributedLockExample {
    public static void main(String[] args) throws Exception {
        String connectString = "192.168.200.138:2181";
        String path = "/myLock";

        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(5000, 10);
        // 创建 Curator 客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);

        // 启动客户端
        client.start();

        // 创建分布式锁
        InterProcessMutex lock = new InterProcessMutex(client, path);

        // 获取锁
        lock.acquire();
        try {
            // 执行临界区代码
            System.out.println("Lock acquired, executing critical section...");
            Thread.sleep(2000);
        } finally {
            // 释放锁
            lock.release();
            System.out.println("Lock released.");
        }

        // 关闭客户端
        client.close();
    }
}

http://www.kler.cn/a/417894.html

相关文章:

  • 支持向量机算法:原理、实现与应用
  • iQOO Neo10系列携三大蓝科技亮相,性能与续航全面升级
  • 【最新鸿蒙开发——应用导航设计】
  • Sybase数据恢复—Sybase数据库无法启动,Sybase Central连接报错的处理案例
  • Mybatis 支持延迟加载的详细内容
  • 程序设计 26种设计模式,如何分类?
  • HarmonyOS4+NEXT星河版入门与项目实战(24)------Stage模型
  • Flink CDC 使用实践以及遇到的问题
  • 基于VUE3集成天地图
  • 【Golang TCP应用】
  • 指针与引用错题汇总
  • upload-labs 靶场(11~21)
  • 网络安全之访问控制
  • 你的网站真的安全吗?如何防止网站被攻击?
  • 【AI】JetsonNano启动时报错:soctherm OC ALARM
  • 【ETCD】etcd中配置参数详解
  • 高级java每日一道面试题-2024年11月29日-JVM篇-常见调优工具有哪些?
  • org.apache.commons.lang3包下的StringUtils工具类的使用
  • Maven 内置绑定到底怎么回事?
  • QT 实现QStackedWidget切换页面开门动画
  • Linux如何安装git
  • pytorch 融合 fuse 学习笔记
  • Linux:进程间通信之进程池和日志
  • Ubuntu 环境下的 C/C++ 编译与调试配置
  • “移门缓冲支架:为家庭安全加码”
  • 以达梦为数据库底座时部署的微服务页面报乱码,调整兼容模式