当前位置: 首页 > article >正文

《Zookeeper 分布式过程协同技术详解》读书笔记-2

目录

  • zk的一些内部原理和应用
    • 请求,事务和标识
      • 读写操作
      • 事务标识(zxid)
    • 群首选举
    • Zab协议(ZooKeeper Atomic Broadcast protocol)
    • 文件系统和监听通知机制
      • 分布式配置中心, 简单Demo
        • java code
      • 集群管理
        • code
      • 分布式锁

zk的一些内部原理和应用

请求,事务和标识

读写操作

  1. 只读请求(exists, getData,getChildren)会在zk服务其本地处理,然后返回给客户端(所以zk处理以只读请求为主要负载时,性能会很高,可以增加更多的服务器到zk集群,这样能处理更多的读请求)

因为ZooKeeper集群中所有的server节点都拥有相同的数据,所以读的时候可以在任意一台server节点上,客户端连接到集群中某一节点,读请求,然后直接返回。当然因为ZooKeeper协议的原因(一半以上的server节点都成功写入了数据,这次写请求便算是成功),读数据的时候可能会读到数据不是最新的server节点,所以比较推荐使用watch机制,在数据改变时,及时感知到

  1. 写操作(create, delete, setData) 将会被转发给群首(群首会执行相应请求,并形成状态更新,我们称为事务(Transaction))

在这里插入图片描述

  1. Client向Zookeeper的server1发送一个写请求,客户端写数据到服务器1上;
  2. 如果server1不是Leader,那么server1会把接收到的写请求转发给Leader;然后Leader会将写请求转发给每个server;
    • server1和server2负责写数据,并且两个Follower的写入数据是一致的,保存相同的数据副本;
    • server1和server2写数据成功后,通知Leader;
  3. 当Leader收到集群半数以上的节点写成功的消息后,说明该写操作执行成功;
    • 这里是3台服务器,只要2台Follower服务器写成功就ok
    • 因为client访问的是server1,所以Leader会告知server1集群中数据写成功;
  4. 被访问的server1进一步通知client数据写成功,这时,客户端就知道整个写操作成功了

事务标识(zxid)

当群首产生了一个事务,就会为该事务分配一个标识符,我们称之 为ZooKeeper会话ID(zxid),通过Zxid对事务进行标识,就可以按照群 首所指定的顺序在各个服务器中按序执行

zxid为一个long型(64位)整数,分为两部分:时间戳(epoch)部 分和计数器(counter)部分。每个部分为32位,在我们讨论zab(Zookeeper Atomic Broadcast )协议 时,我们就会发现时间戳(epoch)和计数器(counter)的具体作用, 我们通过该协议来广播各个服务器的状态变更信息。

eg:

setData 加上事务(版本和新数据值),一个事务为一个单位,以原子方式执行;ZooKeeper集群以事务方式运行,并确保所 有的变更操作以原子方式被执行,同时不会被其他事务所干扰;并不存在传统的关系数据库中所涉及的回滚机制,而是 确保事务的每一步操作都互不干扰

同时一个事务还具有幂等性,也就是说,我们可以对同一个事务执 行两次,我们得到的结果还是一样的,我们甚至还可以对多个事务执行 多次,同样也会得到一样的结果,前提是我们确保多个事务的执行顺序 每次都是一样的。事务的幂等性可以让我们在进行恢复处理时更加简单。

实际上,ZooKeeper的每个节点维护者两个Zxid值,为别为:cZxid、mZxid。

(1)cZxid: 是节点的创建时间所对应的Zxid格式时间戳。

(2)mZxid:是节点的修改时间所对应的Zxid格式时间戳。

高32位是epoch用来标识Leader关系是否改变,每次一个Leader被选出来,它都会有一个新的epoch。低32位是个递增计数。

群首选举

群首为集群中的服务器选择出来的一个服务器,并会一直被集群所 认可。设置群首的目的是为了对客户端所发起的ZooKeeper状态变更请 求进行排序,包括:create、setData和delete操作。群首将每一个请求转 换为一个事务,将这些事务发送给追随者,确保集 群按照群首确定的顺序接受并处理这些事务。

  • 每个服务器启动后进入LOOKING状态,开始选举一个新的群首或 查找已经存在的群首,如果群首已经存在,其他服务器就会通知这个新 启动的服务器,告知哪个服务器是群首,与此同时,新的服务器会与群 首建立连接,以确保自己的状态与群首一致。

  • 如果集群中所有的服务器均处于LOOKING状态,这些服务器之间 就会进行通信来选举一个群首,通过信息交换对群首选举达成共识的选 择。在本次选举过程中胜出的服务器将进入LEADING状态,而集群中 其他服务器将会进入FOLLOWING状态。

当一个服务器进入LOOKING状态,就会发送向集群中每个服 务器发送一个通知消息,如下

投票<服务器标识,最近执行的事务的zxid信息>
(vote<sid, zxid>

在这里插入图片描述

在这里插入图片描述

快速群首选举的快速指的是什么?

Zab协议(ZooKeeper Atomic Broadcast protocol)

在接收到一个写请求操作后,追随者会将请求转发给群首,群首将探索性地执行该请求,并将执行结果以事务的方式对状态更新进行广播。一个事务中包含服务器需要执行变更的确切操作,当事务提交时, 服务器就会将这些变更反馈到数据树上,其中数据树为ZooKeeper用于 保存状态信息的数据结构(请参考DataTree类)。

之后我们需要面对的问题便是服务器如何确认一个事务是否已经提 交,由此引入了我们所采用的协议:Zab:ZooKeeper原子广播协议 (ZooKeeper Atomic Broadcast protocol)。

  • propose: leader 广播给 followers
  • accept: followers收到广播信息给leader发送确认消息
  • commit: leader收到确认仲裁数量后发送消息给follower进行提交操作
    在这里插入图片描述
    具体如下:
    在这里插入图片描述

两个保障(依靠了zxid)

  1. 一个被选举的群首确保在提交完所有之前的时间戳内需要提交的 事务,之后才开始广播新的事务。
  2. 在任何时间点,都不会出现两个被仲裁支持的群首。

为了实现第一个需求,群首并不会马上处于活动状态,直到确保仲 裁数量的服务器认可这个群首新的时间戳值。一个时间戳的最初状态必 须包含所有的之前已经提交的事务,或者某些已经被其他服务器接受, 但尚未提交完成的事务。这一点非常重要,在群首进行时间戳e的任何 新的提案前,必须保证自时间戳开始值到时间戳e-1内的所有提案被提 交。如果一个提案消息处于时间戳e’<e,在群首处理时间戳e的第一个提 案消息前没有提交之前的这个提案,那么旧的提案将永远不会被提交。

文件系统和监听通知机制

分布式配置中心, 简单Demo

  1. 在zookeeper里增加一个目录节点,并把配置信息存储在里面(作为配置中心存储)

在这里插入图片描述

  1. 多个客户端能够读取到
    在这里插入图片描述
  2. 服务端配置改变,客户端(所有注册监听的客户端)都能监听到事件
    在这里插入图片描述
    在这里插入图片描述
java code
package demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.util.concurrent.CountDownLatch;

/**
 * @Author mubi
 * @Date 2020/3/27 22:36
 */
public class ZooKeeperProSync implements Watcher {

    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk = null;
    private static Stat stat = new Stat();

    public static void main(String[] args) throws Exception {
        // zookeeper配置数据的存放路径
        String path = "/myconfig";
        // 连接zookeeper并且注册一个监听器
        zk = new ZooKeeper("127.0.0.1:2281", 5000, new ZooKeeperProSync());
        // 等待zk连接成功的通知(等待connectedSemaphore.countDown()减少为0)
        connectedSemaphore.await();
        // 获取path目录节点的配置数据,并注册对节点的监听
        System.out.println(new String(zk.getData(path, true, stat)));
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void process(WatchedEvent event) {
        // zk连接成功通知事件
        if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }else if (event.getType() == EventType.NodeDataChanged) {
                // zk目录节点数据变化通知事件
                try {
                    System.out.println("配置已修改,新值为:" + new String(zk.getData(event.getPath(), true, stat)));
                    // TODO 具体业务
                } catch (Exception e) {

                }
            }
        }
    }
}

集群管理

集群管理原理:机器的加入/退出,选举leader节点

在这里插入图片描述

  • 持久节点 / 临时节点
  • 有序节点 / 无序节点

可以临时+有序构成server,然后最小编号节点作为leader节点

  1. 创建集群节点目录(持久节点)
    在这里插入图片描述
  2. 启动服务,并加入和减少机器观察
    在这里插入图片描述
    在这里插入图片描述
code
  • AppMaster
package demo;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

/**
 * @Author mubi
 * @Date 2020/3/27 23:43
 */
public class AppMaster {
    private String clusterNode = "mycluster";
    private ZooKeeper zk;
    private volatile List<String> serverList;

    public void connectZookeeper() throws Exception
    {
        // 注册全局默认watcher
        zk = new ZooKeeper("127.0.0.1:2281", 5000, new Watcher()
        {
            @Override
            public void process(WatchedEvent event)
            {
                if (event.getType() == EventType.NodeChildrenChanged
                        && ("/" + clusterNode).equals(event.getPath()))
                {
                    try
                    {
                        updateServerList();
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        });

        updateServerList();
    }

    private void updateServerList() throws Exception
    {
        List<String> newServerList = new ArrayList<String>();

        // watcher注册后,只能监听事件一次,参数true表示继续使用默认watcher监听事件
        List<String> subList = zk.getChildren("/" + clusterNode, true);
        for (String subNode : subList)
        {
            // 获取节点数据
            byte[] data = zk.getData("/" + clusterNode + "/" + subNode, false, null);
            newServerList.add(new String(data, "utf-8"));
        }

        serverList = newServerList;
        System.out.println("server list updated: " + serverList);
    }

    public static void main(String[] args) throws Exception
    {
        AppMaster ac = new AppMaster();
        ac.connectZookeeper();
        Thread.sleep(Long.MAX_VALUE);
    }
}

  • AppServer
package demo;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;


/**
 * @Author mubi
 * @Date 2020/3/27 23:44
 */
public class AppServer extends Thread
{
    private String clusterNode = "mycluster";
    private String serverNode = "server_address";
    private String serverName;

    @Override
    public void run()
    {
        try {
            connectZookeeper(serverName);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void connectZookeeper(String address) throws Exception
    {
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2281", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {}
        });

        // 关键方法,创建包含自增长id名称的目录,这个方法支持了分布式锁的实现
        // 四个参数:
        // 1、目录名称 2、目录文本信息
        // 3、文件夹权限,Ids.OPEN_ACL_UNSAFE表示所有权限
        // 4、目录类型,CreateMode.EPHEMERAL_SEQUENTIAL表示创建临时目录,session断开连接则目录自动删除
        String createdPath = zk.create(
                "/" + clusterNode + "/" + serverNode,
                address.getBytes("utf-8"),
                Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("create: " + createdPath);
        Thread.sleep(Integer.MAX_VALUE);
    }

    public AppServer(String serverName)
    {
        this.serverName = serverName;
    }
}


  • Server1
package demo;

/**
 * @Author mubi
 * @Date 2020/3/27 23:46
 */

public class Server1
{
    public static void main(String[] args) throws Exception
    {
        AppServer server1 = new AppServer("Server1");
        server1.start();
    }
}

分布式锁

一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

避免羊群效应:所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力。所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反应。

在这里插入图片描述


http://www.kler.cn/a/556824.html

相关文章:

  • [Android]AppCompatEditText限制最多只能输入两位小数
  • 在 VS Code 远程连接服务器时遇到 “Bad permissions“ 错误的解决方案
  • 如何在 Ubuntu 上安装 Docker:详细步骤指南
  • 关闭超时订单和七天自动确认收货+RabbitMQ规范
  • git 目前常用的一些命令整理记录
  • 安装Liunx(CentOS-6-x86_64)系统
  • 苍穹外卖day7 缓存菜品 SpringCache缓存套餐 增删改查购物车
  • Python:Matplotlib详细使用
  • 【RL Latest Tech】安全强化学习(Safe RL):理论、方法与应用
  • 贪心算法-买卖股票的最佳时机
  • 第四届图像、信号处理与模式识别国际学术会议(ISPP 2025)
  • VMware安装Centos 9虚拟机+设置共享文件夹+远程登录
  • docker独立部署milvus向量数据库
  • rust笔记8-Deref与隐式解引用强制转换
  • cesium视频投影
  • 智能预警系统标准化处理流程
  • Memcached和redis对比了解
  • 蓝桥杯备赛1-7求和
  • Pytorch实现论文之三元DCGAN生成RGB图像用于红外图像着色生成
  • (一)趣学设计模式 之 单例模式!