ZooKeeper 客户端API操作
文章目录
- 一、节点信息
- 1、创建节点
- 2、获取子节点并监听节点变化
- 3、判断节点是否存在
- 4、客户端向服务端写入数据
- 写入请求直接发给 Leader 节点
- 写入请求直接发给 follow 节点
- 二、服务器动态上下线监听
- 1、监听过程
- 2、代码
- 三、分布式锁
- 1、什么是分布式锁?
- 2、Curator 框架实现分布式锁
一、节点信息
前提:centos102、centos103、centos104 服务器都已经开启
pom.xml 依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
log4j.properties 配置
# 设置全局的日志记录级别为 INFO
log4j.rootLogger=INFO, stdout
# 控制台输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
# 文件输出
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
1、创建节点
zkClient.java 代码
// 注意:逗号后面不能有空格
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
// 创建客户端
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 创建子节点
@Test
public void create() throws InterruptedException, KeeperException {
String nodeCreated = zkClient.create("/frost", "cat".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
运行创建子节点,看看是否创建了该节点
2、获取子节点并监听节点变化
@Test
public void getChildren() throws InterruptedException, KeeperException {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
}
那如果此时我再创建一个节点,此时控制台没有任何变化,我想要创建一个节点控制台能够看到相关变化怎么办?此时只需要将程序保持不结束,然后将客户端查看子节点函数放入监听器中。
3、判断节点是否存在
@Test
public void exit() throws InterruptedException, KeeperException {
Stat stat = zkClient.exists("/frost", false);
System.out.println(stat == null ? "not exits" : "exits");
}
4、客户端向服务端写入数据
写入请求直接发给 Leader 节点
- 客户端发送写入请求,leader节点执行写入操作
- leader通知follow1执行写入操作
- folllow1写入完毕给leader返回确认ack
- 现在半数以上服务器完成写入,leader给客户端发送确认ack
- leader通知follow2写入
- follow2写入完毕给leader发送确认ack
写入请求直接发给 follow 节点
- 客户端发送写入请求,
- follow1 将写入请求发送给leader
- leader节点执行写入操作,然后leader通知follow1执行写入操作
- folllow1写入完毕给leader返回确认ack
- 现在半数以上服务器完成写入,leader给follow1发送确认ack
- follow1给客户端发送确认ack
- leader通知follow2写入
- follow2写入完毕给leader发送确认ack
二、服务器动态上下线监听
1、监听过程
以下红色字体写错,应该是下线则通知注册监听器的客户端
对于ZooKeeper集群来说,客户端和服务器都相当于客户端,区别在于:服务器在ZooKeeper集群中是创建节点,客户端在ZooKeeper是监听信息。
2、代码
服务器注册到zk集群
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer server = new DistributeServer();
// 1. 获取zk连接
server.getConnect();
// 2. 注册服务器到 zk 集群
server.regist(args[0]);
// 3. 启动业务逻辑(睡觉)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void regist(String hostname) throws InterruptedException, KeeperException {
String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 临时带序号的节点
System.out.println(hostname + "is online");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
客户端进行监听
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "centos102:2181,centos103:2181,centos104:2181";
private int sessionTimeout = 2000;
ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient client = new DistributeClient();
// 1. 获取zk连接
client.getConnect();
// 2. 监听/servers下子节点的增加和删除
client.getServerList();
// 3. 启动业务逻辑(睡觉)
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws InterruptedException, KeeperException {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
getServerList();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
启动客户端,然后在服务器上进行增加节点监听
删除节点监听
因为我们服务端的代码传参了,所以我们需要设置一下这个参数:
下图代表服务端启动的是hadoop102节点
先把客户端启动起来,发现有一个节点hadoop101:
在启动服务端,hadoop102上线:
然后返回看客户端的监听,发现节点有变化,打印出所有节点 [hadoop102, hadoop101]
此时我们修改一下再此启动服务端让 hadoop103 上线:
返回客户端查看发现 hadoop102 下线,hadoop103 上线
三、分布式锁
1、什么是分布式锁?
比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private final String connectString = "centos102:2181,centos103:2181,centos104:2181";
private final int sessionTimeout = 2000;
ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
// 前一个节点
private String waitPath;
// 当前节点
String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 1. 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// connectLatch,如果连接上zk,可以释放
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// waitLatch,需要释放
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
// 等待zk正常连接后往下走
connectLatch.await();
// 2. 判断根节点/lock是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
// 创建根节点(永久节点)
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 对 zk 加锁
public void zkLock() {
// 创建对应的临时带序号的节点
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判断创建的节点是否是最小的序号节点,如果是,获取到锁;如果不是,监听前一个节点
List<String> children = zk.getChildren("/locks", false);
// 如果 children 只有一个节点,直接获取锁;如果有多个节点,需要判断,谁最小
if (children.size() == 1) {
return;
}else {
// 排序
Collections.sort(children);
// 获取节点名称seq00000001
String thisNode = currentMode.substring("/locks/".length());
// 通过seq00000001获取该节点在children当中的位置
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
}else if (index == 0) {
// 该节点为第一个,获取锁直接返回
return;
}else {
// 不是第一个,监听前一个节点
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, null);
// 等待监听结束
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁
public void unZkLock() throws InterruptedException, KeeperException {
// 删除节点
zk.delete(currentMode, -1);
}
}
测试
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
final DistributedLock lock1 = new DistributedLock();
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程1启动,获取到锁");
Thread.sleep(5000);
lock1.unZkLock();
System.out.println("线程1释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程2启动,获取到锁");
Thread.sleep(5000);
lock2.unZkLock();
System.out.println("线程2释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}).start();
}
}
2、Curator 框架实现分布式锁
原生JAVA API出现的问题:
(1)会话是异步的,需要自己去连接
(2)Watch需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高
(4)不支持多节点的删除和创建,需要自己去递归
Curator 是一个专门解决分布式锁的框架,解决了原生JAVA API开发分布式遇到的的问题
curator 官方文档:https://curator.apache.org/index.html
pom.xml 文件添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程1获取到锁");
lock1.acquire();
System.out.println("线程1获取到锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1释放锁");
lock1.release();
System.out.println("线程1再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程2获取到锁");
lock2.acquire();
System.out.println("线程2获取到锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2释放锁");
lock2.release();
System.out.println("线程2再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("centos102:2181,centos103:2181,centos104:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
// 启动客户端
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}