zookeeper服务器动态上下线监听案例
zookeeper服务器动态上下线监听案例
文档
- linux安装java -centos安装java -linux配置java环境变量
- zookeeper单机安装
- zookeeper集群安装
- zookeeper客户端命令行操作、节点类型及监听器
- zookeeper集群写数据原理
- java操作zookeeper
说明
两种角色:服务器、客户端
客户端监听服务器的状态,服务器上线下变化时,客户端接收到通知
无论是服务器,还是客户端,对于zookeeper来说,都是zookeeper客户端
原理
-
zookeeper临时节点的性质:zookeeper客户端断开连接后,节点自动删除
-
客户端设置监听器,监听子节点变化,监听
/servers
节点的子节点:ls -w /servers
需要注意的是,监听器仅监听一次,想要重复监听,则需要重复设置监听
-
服务器连接,创建临时节点,客户端监听,服务器上线
-
服务器断开连接,临时节点自动删除,客户端监听,服务器下线
命令行模拟服务器动态上下线监听
-
客户端连接(启动一个zookeeper客户端),设置监听
ls -w /servers
-
服务器1连接(启动一个zookeeper客户端),新增子节点,节点类型为临时有序号节点,可以通过序号获知该子节点的创建顺序,创建即表示服务器在线
create -e -s /servers/server
-
此时客户端将收到监听回调,此时可以获取子节点列表,来查看最新的服务器在线情况
ls /servers
客户端再次设置监听
ls -w /servers
-
服务器2连接,重复步骤2
-
客户端,重复步骤3
-
服务器1断开连接,创建的临时节点将自动删除
-
客户端,重复步骤3
-
服务器2断开连接,创建的临时节点将自动删除
-
客户端,重复步骤3
java代码实现服务器动态上下线监听
-
客户端示例代码
DistributeClient.java
package xin.yangshuai.zookeeper01.case1; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; public class DistributeClient { // 注意:逗号左右不能有空格 private String connectString = "192.168.145.132:2181,192.168.145.133:2181,192.168.145.134:2181"; // 2000毫秒 private int sessionTimeout = 2000; private ZooKeeper zkClient; public void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { // 初始化时,会执行一次 System.out.println("当前监听的节点:" + watchedEvent.getPath()); try { // 由于注册一次,监听一次,如果想持续监听,可重新注册 // getServerList()方法设置了监听,所以每次调用将重新注册监听 getServerList(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); } private void getServerList() throws KeeperException, InterruptedException { // 参数watch设置为true,表示使用zkClient创建时的Watcher,并监听当前节点的子节点的增删变化 // 提前创建好 /servers 节点 List<String> children = zkClient.getChildren("/servers", true); System.out.println("服务器在线数量:" + children.size()); for (String child : children) { System.out.println("服务器 " + child + " 在线"); } } }
-
服务端示例代码
DistributeServer.java
package xin.yangshuai.zookeeper01.case1; import org.apache.zookeeper.*; import java.io.IOException; public class DistributeServer { // 注意:逗号左右不能有空格 private String connectString = "192.168.145.132:2181,192.168.145.133:2181,192.168.145.134:2181"; // 2000毫秒 private int sessionTimeout = 2000; private ZooKeeper zkClient; private String node; public void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } // 注册 public void register() throws KeeperException, InterruptedException { // 创建一个带序号的临时节点 // 提前创建好 /servers 节点 node = zkClient.create("/servers/server", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("服务器 " + node + " 注册成功"); } // 断开连接 public void close() throws InterruptedException { System.out.println("服务器 " + node + " 下线"); zkClient.close(); } }
-
测试方法示例代码
DistributeTest.java
package xin.yangshuai.zookeeper01.case1; import org.apache.zookeeper.KeeperException; import java.io.IOException; public class DistributeTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { // 启动客户端,连接zookeeper,客户端设置监听 DistributeClient client = new DistributeClient(); client.getConnect(); // 启动服务器1,连接zookeeper DistributeServer server1 = new DistributeServer(); server1.getConnect(); // 启动服务器2,连接zookeeper DistributeServer server2 = new DistributeServer(); server2.getConnect(); Thread.sleep(5000); // 服务器1上线 server1.register(); Thread.sleep(5000); // 服务器2上线 server2.register(); Thread.sleep(5000); // 服务器1下线 server1.close(); Thread.sleep(5000); // 服务器2下线 server2.close(); Thread.sleep(Long.MAX_VALUE); } }
-
可以只启动客户端,保持运行状态。用命令行模拟服务端
-
仅供参考
参考资料
- 尚硅谷