Java代码操作ZooKeeper(使用原生 ZooKeeper 客户端库)
1.6.1 连接Zookeeper
1.6.1.1 添加 Maven 依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
1.6.1.2 编写 Java 代码
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ConnectZookeeper {
private static final String ZK_HOST = "192.168.200.138:2181";
private static final int CONNECT_TIMEOUT = 20000; // 设置超时时间需要合理
public static void main(String[] args) throws IOException, InterruptedException {
// 创建一个ZooKeeper客户端实例
ZooKeeper zk = new ZooKeeper(ZK_HOST, CONNECT_TIMEOUT, event -> {
System.out.println("Watcher的process被调用了");
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
}
if (event.getState() == Watcher.Event.KeeperState.Closed) {
System.out.println("连接关闭");
}
});
System.out.println("客户端开始连接ZK服务器");
// 等待连接到ZooKeeper
ZooKeeper.States state = zk.getState();
System.out.println(state);
Thread.sleep(20000);
state = zk.getState();
System.out.println(state);
Thread.sleep(20000);
zk.close();
}
}
可以看到如下运行结果:
客户端开始连接ZK服务器
CONNECTING
Watcher的process被调用了
连接成功
CONNECTED
Watcher的process被调用了
连接关闭
1.6.2 增删改查操作及Watcher监听
import org.apache.zookeeper.*;
import java.io.IOException;
public class ZookeeperCRUD {
private static final String ZK_HOST = "192.168.200.138:2181";
private static final int CONNECT_TIMEOUT = 20000; // 设置超时时间需要合理
private static final String PATH = "/my_crud_node";
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
// 创建一个ZooKeeper客户端实例
ZooKeeper zk = new ZooKeeper(ZK_HOST, CONNECT_TIMEOUT, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
}
// 监听数据变化
if (event.getType()== Watcher.Event.EventType.NodeDataChanged) {
System.out.println("数据被改变");
}
if (event.getType()== Watcher.Event.EventType.NodeDeleted) {
System.out.println("节点已删除");
}
if (event.getState() == Watcher.Event.KeeperState.Closed) {
System.out.println("连接关闭");
}
});
ZooKeeper.States state = zk.getState();
System.out.println(state);
Thread.sleep(20000);
state = zk.getState();
System.out.println(state);
Thread.sleep(2000);
//对节点进行增删改查
// PERSISTENT 节点是一种持久化的节点类型,它在客户端会话结束时不会自动删除,而是会一直保留在 ZooKeeper 中,直到被显式删除
zk.create(PATH, "I like ZooKeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
byte[] data;
data = zk.getData(PATH, true, null);
System.out.println(new String(data));
zk.setData(PATH, "I like Java".getBytes(), -1);
// 第二个参数设置成true,表示开启监听
data = zk.getData(PATH, true, null);
System.out.println(new String(data));
zk.delete(PATH, -1);
zk.close();
}
}