快速入门Zookeeper
Zookeeper
ZooKeeper作为一个强大的开源分布式协调服务,扮演着分布式系统中至关重要的角色。它提供了一个中心化的服务,用于维护配置信息、命名、提供分布式同步以及提供组服务等。通过其高性能和可靠的特性,ZooKeeper能够确保在复杂的分布式环境中,各个节点和服务之间的协调和通信得以顺利进行。无论是在大规模的Web服务、云基础设施管理,还是大数据处理框架中,ZooKeeper都是实现服务发现、负载均衡、数据发布/订阅等关键功能的首选工具。它的出现极大地简化了分布式系统设计中的一致性问题,使得开发者可以更加专注于业务逻辑的实现。
安装配置
学习笔记:快速入门ZooKeeper技术
zookeeper命令操作
zookeeper数据模型
服务端命令操作
• 启动 ZooKeeper 服务: ./zkServer.sh start
• 查看 ZooKeeper 服务状态: ./zkServer.sh status
• 停止 ZooKeeper 服务: ./zkServer.sh stop
• 重启 ZooKeeper 服务: ./zkServer.sh restart
客户端命令操作
连接ZooKeeper服务端:
./zkCli.sh –server ip:port
断开链接
quit
查看节点信息
ls /dubbo
创建节点并赋值(没有加任何参数就是持久化的)
create /节点path value
设置节点值
set /节点path value
删除节点值
delete /path
不能重复创建节点,但是可以创建子节点往下延申
create /app1/p1
create /app1/p2
子节点存在的时候,不允许直接删除父节点
直接删除子节点的节点
deleteall /节点path
创建临时顺序节点
create -e /节点path value
退出会话,重新打开xshell查询会发现临时节点没有了
创建持久化顺序节点
create -s /节点path value
如果生成多次,会发现顺序产生多个节点
查看创建顺序临时节点
create -es /节点路径
查看节点详细信息
ls -s /节点path(ls2 /)
之前使用过Dubbo,这里可以看一下服务提供方的ip地址
JAVA API操作
注意
因为Curator是ZooKeeper的Java客户端库,所以二者的版本要对应起来可以看下官网的说明。
Zookeeper 3.5X 的版本可以用Curator4.0版本
低版本的Curator不能用高版本的Zookeeper,反之则可以。
导入pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
建立链接
public class CuratorTest {
/**
* 建立连接
*/
@Test
public void testConnection() {
// 1.第一种方式
// 重试策略,该策略重试设定的次数,每次重试之间的睡眠时间都会增加
/**
* 参数:
* baseSleepTimeMs – 重试之间等待的初始时间量
* 最大重试 次数 – 重试的最大次数
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/**
* Create a new client
*
* @param connectString list of servers to connect to 连接字符串,地址+端口 例如“192. 168.149.135:2181, 192.168.149.135”
* @param sessionTimeoutMs session timeout 会话超时时间 单位毫秒
* @param connectionTimeoutMs connection timeout 连接超时时间 单位毫秒
* @param retryPolicy retry policy to use 策略
* @return client
*/
CuratorFramework client =
CuratorFrameworkFactory.newClient("192. 168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);
client.start();
}
}
/**
* 建立连接 方式二
*/
@Test
public void testConnection2() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
// 第二种方式
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString("192. 168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("itheima");;
builder.build().start();
}
注意这里建议把namespace加上,相当于是根目录
添加节点
public class CuratorTest {
// 声明为成员变量
private CuratorFramework client;
/**
* 建立连接 方式二
*/
@Before
public void testConnection2() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
// 第二种方式
client = CuratorFrameworkFactory.builder().connectString("192. 168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("itheima").build();
client.start();
}
/**
* 创建节点
* 持久、临时、顺序
* 设置数据
* <p>
* 1.基本创建
* 2.创建节点,带有数据
* 3.设置节点的类型
* 4.创建多级节点
*/
@Test
public void testCreate() throws Exception {
// client操作
}
@After
/**
* 关闭连接
*/
public void close() {
if (null != client) {
client.close();
}
}
}
(1).创建一个基础的节点
我们先创建一个基础的节点
CuratorTest.java
@Test
public void testCreate() throws Exception {
String path = client.create().forPath("/app4");
System.out.println(path);
}
(2).创建一个带有数据的节点
CuratorTest.java
@Test
public void testCreateValue() throws Exception {
String path = client.create().forPath("/app5", "heima".getBytes());
System.out.println(path);
}
(3).设置节点的类型
CuratorTest.java
创建临时节点
/**
* 设置节点类型
*
* @throws Exception
*/
@Test
public void testCreateType() throws Exception {
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app6", "number6".getBytes());
System.out.println(path);
}
(4).创建多级节点
CuratorTest.java
/**
* 多级节点
*
* @throws Exception
*/
@Test
public void testCreateManyTypes() throws Exception {
// creatingParentsIfNeeded()如果父节点不存在,就创建一个节点
String path = client.create().creatingParentsIfNeeded().forPath("/app8/p8", "number8".getBytes());
System.out.println(path);
}
查询节点
(1).查询数据
CuratorTest.java
/**
* 获取、查询
* 1.查询数据
* 2.查询子节点
* 3.查询节点的状态 ls -s
*/
@Test
public void testGet() throws Exception {
byte[] bytes = client.getData().forPath("/app7");
System.out.println(new String(bytes));
}
(2).查询子节点
CuratorTest.java
/**
* 获取、查询
* 2.查询子节点
*/
@Test
public void testGetSon() throws Exception {
List<String> path = client.getChildren().forPath("/app8");
System.out.println(path);
}
(3).查询节点的状态 ls -s
CuratorTest.java
/**
* 3.查询节点的状态 ls -s
*/
@Test
public void testGetStatus() throws Exception {
Stat stat = new Stat();
System.out.println("查询前:" + stat);
client.getData().storingStatIn(stat).forPath("/app8/p8");
System.out.println("查询后:" + stat);
}
修改节点
修改数据
修改前用Zookeeper查询
/**
* 修改数据
*
* @throws Exception
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/app7", "dong77".getBytes());
}
根据版本修改数据⭐(推荐)
CuratorTest.java
/**
* 按照版本修改
*
* @throws Exception
*/
@Test
public void testSetForVersion() throws Exception {
int version = 0;
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app7");
version = stat.getVersion();
System.out.println("当前version是:" + version);
client.setData().withVersion(version).forPath("/app7", "luka7".getBytes());
}
删除节点
/**
* 1.删除单个节点
*
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
client.delete().forPath("/app5");
}
删除带有子节点的节点
删除前节点app7是有子节点的
/**
* 2.删除带有子节点的节点
*
* @throws Exception
*/
@Test
public void testDeleteWithSon() throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/app7");
}
必须成功的删除节点⭐(推荐):防止网络抖动,本质抖动。
删除前app4还在
/**
* 3.必须成功的删除节点
*
* @throws Exception
*/
@Test
public void testDeleteSucceed() throws Exception {
client.delete().guaranteed().forPath("/app4");
}
回调
删除前有节点
/**
* 4.回调
*
* @throws Exception
*/
@Test
public void testDeleteReturnFun() throws Exception {
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了");
System.out.println(event);
}
};
client.delete().inBackground(callback).forPath("/app8");
}
Watch事件监听
(1).NodeCache给指定一个节点注册监听器
用代码实现一下:
命名空间下面可能没有子节点的话,过一会儿就会删除命名空间
CuratorTest.java
/**
* NodeCache : 只是监听某一个特定的节点
*
* @throws Exception
*/
@Test
public void testNodeCache() throws Exception {
while (true) {
/**
* Params:
* client – curztor client 客户端
* path – the full path to the node to cache 要缓存的节点的完整路径
* dataIsCompressed – if true, data in the path is compressed 如果为 true,则路径中的数据被压缩,默认不压缩
*/
// 1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client, "/app1");
// 2.注册监听8
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
}
});
// 3.开启监听 如果为 true,将在此方法返回之前调用, rebuild() 以便获得节点的初始视图
nodeCache.start(true);
}
}
现在想知道节点变成了什么,这里需要修改代码
CuratorTest.java
/**
* NodeCache : 只是监听某一个特定的节点
*
* @throws Exception
*/
@Test
public void testNodeCache() throws Exception {
/**
* Params:
* client – curztor client 客户端
* path – the full path to the node to cache 要缓存的节点的完整路径
* dataIsCompressed – if true, data in the path is compressed 如果为 true,则路径中的数据被压缩,默认不压缩
*/
// 1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client, "/app1");
// 2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
// 获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("修改后数据是:" + new String(data));
}
});
// 3.开启监听 如果为 true,将在此方法返回之前调用, rebuild() 以便获得节点的初始视图
nodeCache.start(true);
while (true) {
}
}
(2).PathChildrenCache(监听某个节点的子节点们 )
测试前先给app1节点创建3个子节点
/**
* PathChildrenCache : 监控一个ZNode的所有子节点.
*
* @throws Exception
*/
@Test
public void testPathChildrenCache() throws Exception {
/**
* 如果为 true,则除了统计信息之外,还会缓存节点内容
*/
// 1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", true);
// 2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了");
System.out.println(event);
}
});
pathChildrenCache.start();
while (true) {
}
}
那现在我想看到具体子节点的变化情况和变化的值呢。修改CuratorTest.java修改PathChildrenCache构造函数,最后一个值是false,然后输出监听信息。
/**
* PathChildrenCache : 监控一个ZNode的所有子节点.
*
* @throws Exception
*/
@Test
public void testPathChildrenCache() throws Exception {
/**
* 如果为 true,则除了统计信息之外,还会缓存节点内容
*/
// 1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", false);
// 2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了");
System.out.println(event);
// 1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
// 2.判断类型是否是update
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
byte[] bytes = event.getData().getData();
System.out.println("数据被更新了:" + new String(bytes));
} else if (type.equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
byte[] bytes = event.getData().getData();
System.out.println("数据被添加了:" + new String(bytes));
} else if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
byte[] bytes = event.getData().getData();
System.out.println("数据被删除了:" + new String(bytes));
}
}
});
pathChildrenCache.start();
while (true) {
}
}
(3).TreeCache(监听某个节点自己和所有的子节点们)
CuratorTest.java
/**
* TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
*
* @throws Exception
*/
@Test
public void testTreeCache() throws Exception {
TreeCache treeCache = new TreeCache(client, "/app1");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("整个树节点有变化");
System.out.println(event);
}
});
treeCache.start();
while (true) {
}
}
也想输出树节点的信息,继续修改CuratorTest.java
/**
* TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
*
* @throws Exception
*/
@Test
public void testTreeCache() throws Exception {
TreeCache treeCache = new TreeCache(client, "/app1");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("整个树节点有变化");
System.out.println(event);
TreeCacheEvent.Type type = event.getType();
if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)) {
byte[] bytes = event.getData().getData();
System.out.println("树节点数据被更新了:" + new String(bytes));
} else if (type.equals(TreeCacheEvent.Type.NODE_ADDED)) {
byte[] bytes = event.getData().getData();
System.out.println("树节点数据被添加了:" + new String(bytes));
} else if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {
byte[] bytes = event.getData().getData();
System.out.println("树节点数据被删除了:" + new String(bytes));
} else {
System.out.println("树节点无操作");
}
}
});
treeCache.start();
while (true) {
}
}
分布式锁
原理
当客户端想要获得锁,则创建节点,使用完锁,则删除该节点。
模拟12306售票案例
创建一个测试类LockTest.java
package com.itheima;
/**
* @ClassName: LockTest
* @Description:
* @Author: wty
* @Date: 2023/3/14
*/
public class LockTest {
public static void main(String[] args) {
Tick12306 tick12306 = new Tick12306();
// 创建客户端
Thread t1 = new Thread(tick12306, "携程");
Thread t2 = new Thread(tick12306, "飞猪");
Thread t3 = new Thread(tick12306, "去哪儿");
t1.start();
t2.start();
t3.start();
}
}
创建实体类模拟12306抢票操作
package com.itheima;
/**
* @ClassName: Tick12306
* @Description:
* @Author: wty
* @Date: 2023/3/14
*/
public class Tick12306 implements Runnable {
// 数据库的票数
private int tickets = 100;
@Override
public void run() {
while (true) {
if (tickets > 0) {
System.out.println("线程:" + Thread.currentThread().getName() + "买走了票,剩余" + (--tickets));
}
}
}
}
解决方案:用分布式锁解决
添加工具类ClientConnection.java
public class ClientConnection {
public static CuratorFramework getConnection() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192. 168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000).retryPolicy(retry).build();
client.start();
return client;
}
}
修改Tick12306.java
public class Tick12306 implements Runnable {
private InterProcessMutex lock;
// 数据库的票数
private int tickets = 100;
public Tick12306() {
CuratorFramework client = ClientConnection.getConnection();
lock = new InterProcessMutex(client, "/itheima/lock");
}
@Override
public void run() {
while (true) {
// 获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if (tickets > 0) {
System.out.println("线程:" + Thread.currentThread().getName() + "买走了票,剩余" + (--tickets));
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
zookeeper 集群
配置每一个Zookeeper 的dataDir 和 clientPort 分别为2181 2182 2183
修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
模拟集群异常
(1)首先我们先测试如果是从服务器挂掉,会怎么样
把3号服务器停掉,观察1号和2号,发现状态并没有变化。
由此得出结论,3个节点的集群,从服务器挂掉,集群正常
(2)我们再把1号服务器(从服务器)也停掉,查看2号(主服务器)的状态,发现已经停止运行了。
由此得出结论,3个节点的集群,2个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数。
(3)我们再次把1号服务器启动起来,发现2号服务器又开始正常工作了。而且依然是领导者。
(4)我们把3号服务器也启动起来,把2号服务器停掉,停掉后观察1号和3号的状态。
(5)我们再次测试,当我们把2号服务器重新启动起来启动后,会发生什么?2号服务器会再次成为新的领导吗?结果2号服务器启动后依然是跟随者(从服务器),3号服务器依然是领导者(主服务器),没有撼动3号服务器的领导地位。由此我们得出结论,当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者。