ZooKeeper最全详解 (万字图文总结!!!)
目录
一、什么是ZooKeeper
1.1 ZooKeeper的特点
1.2 ZooKeeper架构
1.3 ZooKeeper数据模型
1.4 数据节点类型
二、Zookeeper安装
2.1 单机安装
2.2 集群安装
2.3 ZooKeeper ACL使用
2.4 ZooKeeper使用场景
2.5 服务启动流程
2.5.1 单机启动
2.5.2 集群启动
三、Zookeeper之ZAB
3.1 什么是ZAB协议
3.2 Watcher监听
3.3 存储机制
3.4 会话机制
四、Zookeeper运维及配置
4.1 参数配置
4.2 服务端监控
4.3 客户端启动流程
4.4 ZooKeeper客户端
4.5 Curator
一、什么是ZooKeeper
Zookeeper 是一个开源的分布式协调服务【Google Chubby的开源实现】。ZooKeeper 可以用于实现分布式系统中常见的发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
1.1 ZooKeeper的特点
-
顺序一致性 :来自客户端的更新将按发送顺序应用。
-
原子性:更新成功或失败。无部分结果。
-
单个系统映像:客户端将看到相同的服务视图,无论它连接到哪个服务器。即,即使客户端故障转移到具有相同会话的不同服务器,客户端也永远不会看到系统的旧视图。
-
可靠性:应用更新后,它将从那时起一直存在,直到客户端覆盖更新。
-
及时性:保证系统的客户端视图在特定时间范围内是最新的。
1.2 ZooKeeper架构
ServerCnxnFactory:向外暴露TCP长链接,维护双向链接通道,分发请求。
LearnerCnxAcceptor:Leader服务Socket服务端,接收Follower、Observer服务的创建链接请求,创建LearnerHandler,用于内部数据交互。
FastLeaderElection:Leader内部选举,为避免双方重复创建链接,由myid
大的向myid
小的创建链接
SessionTracker:会话管理及独立线程会话超时清理
ZKDatabase:ZooKeeper的内存数据库,负责管理DataTree、事务日志,启动时从事务日志和快照中恢复到该内存数据库,运行期,将接收到的事务请求写入事务日志,当触发快照操作时,提供数据。
1.3 ZooKeeper数据模型
ZooKeeper 的数据节点可以视为树状结构,树中的各节点被称为 Znode,一个 Znode 可以有多个子节点,ZooKeeper 中的所有存储的数据是由 znode 组成,并以 key/value 形式存储数据。整体结构类似于 linux 文件系统的模式以树形结构存储,其中根路径以 / 开头
1.4 数据节点类型
-
持久节点【PERSISTENT】:数据节点被创建后,就会一直存在ZooKeeper服务器上,直到有删除操作主动清除节点数据。
-
持久顺序节点【PERSISTENT_SEQUENTIAL】:在持久性的基础上,又具有了顺序性特性。每个父节点都会为它的第一级子节点维护一份顺序,用于记录每个子节点的先后顺序。在创建子节点过程中,ZooKeeper会自动为给定的节点名加上一个数字后缀。
-
临时节点【EPHEMERAL】:临时节点的生命周期和客户端的会话绑定在一起,如果客户端会话失效,该会话对应的临时节点就会被自动清理。临时节点不能创建子节点。
-
临时顺序节点【EPHEMERAL_SEQUENTIAL】:在临时节点的特性上,添加了顺序性特点。
二、Zookeeper安装
2.1 单机安装
-
安装JDK环境并配置环境变量
JAVA_HOME
-
下载ZooKeeper软件
# mkdir -p /opt/soft/zookeeper/
# wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
# tar -xvzf apache-zookeeper-3.8.2-bin.tar.gz
-
配置相关调整
// 创建事务日志和快照目录
# mkdir -p /data/zk/data
# mkdir -p /data/zk/log
//复制并修改配置信息
# cp -r /opt/soft/zookeeper/apache-zookeeper-3.8.2-bin/conf/zoo_sample.cfg /opt/soft/zookeeper/apache-zookeeper-3.8.2-bin/conf/zoo.cfg
#vi /opt/soft/zookeeper/apache-zookeeper-3.8.2-bin/conf/zoo.cfg
//修改快照路径并新增事务日志目录
dataDir=/data/zk/data
dataLogDir=/data/zk/log
-
服务启动
# /opt/soft/zookeeper/apache-zookeeper-3.8.2-bin/bin/zkServer.sh start
2.2 集群安装
集群安装相比单机安装,需要在配置中维护节点的角色是参与者或观察者【默认角色为参与者】和集群节点的各个成员,3.5.0开始可用通过动态配置完成集成成员的维护
-
zoo.cfg配置调整为动态配置
## 去除clientPort,改为动态配置
##clientPort=2181
## 跳过acl检查。如果不设置成yes,则需要superuser权限才能执行reconfig
skipACL=yes
## 开启动态配置
reconfigEnabled=true
## 加载dynamic配置文件的路径
dynamicConfigFile=/conf/zoo.cfg.dynamic
-
初始化集群成员配置
## zoo1\zoo2\zoo3为各节点地址,2888为LF交互端口,3888为Leader选举端口,2181为向客户端暴漏端口,各地址对应端口火墙互通
server.1=zoo1:2888:3888;2181
server.2=zoo2:2888:3888;2181
server.3=zoo3:2888:3888;2181
-
各个节点设置myid文件
// 每个节点设置的不同,例如zoo1设置为1
# echo '1' > /data/zk/data/myid
-
启动各个节点服务,并检测配置
# ${ZK_HOME}/bin/zkCli.sh
//查看维护的节点配置
> config
server.1=zoo1:2888:3888:participant;0.0.0.0:2181
server.2=zoo2:2888:3888:participant;0.0.0.0:2181
server.3=zoo3:2888:3888:participant;0.0.0.0:2181
//动态添加4节点
> reconfig -add 4=zoo4:2888:3888;2181
//动态删除1节点
> reconfig -remove 1
-
执行reconfig命令后,需确认扩容/缩容是否成功:执行reconfig命令只意味着激活新配置的请求已发出,至于新配置有没有在所有节点上生效需要用户自行确认
-
应当尽量避免执行reconfig -remove leader节点,防止集群短暂不可用
-
zk客户端可通过监听zk路径:/zookeeper/config实时监听zk节点列表的变化(前提是:客户端配置的zk节点列表中至少一个可用)。得到完整的zk节点列表之后,zk客户端可平衡每个zk节点的请求数,客户端也无需重新配置zk节点列表甚至重启
ZooKeeper CLI使用
//启动zk客户端
# ${ZK_HOME}/bin/zkCli.sh
//查看客户端支持的命令
#help
//创建持久节点
# create /persistent_node
//创建持久顺序节点
# create -s /persistent_sequential_node mydata
//创建临时节点
# create -e /ephemeral_node mydata
//创建临时顺序节点
# create -s -e /ephemeral_sequential_node mydata
2.3 ZooKeeper ACL使用
为避免存储到节点的数据误修改或删除,ZooKeeper提供了ACL进行权限控制。
权限组成
Zookeeper 的权限由[scheme : id :permissions]三部分组成
Permissions 内置可选线:
-
CREATE:允许创建子节点
-
READ:允许从节点获取数据并列出其子节点
-
WRITE:允许为节点设置数据
-
DELETE:允许删除子节点
-
ADMIN:允许为节点设置权限
Schemes 内置可选项:
-
world:默认模式,所有客户端都拥有指定的权限。world下只有一个id选项anyone,通常组合写法为 world:anyone:[permissons]
-
auth:只有经过认证的用户才拥有指定的权限。通常写法为 auth:user:password:[permissons],使用这种模式时,你需要先进行登录,之后采用auth模式设置权限时,user和password都将使用登录的用户名和密码
-
digest:只有经过认证的用户才拥有指定的权限。通常写法为 auth:user:BASE64(SHA1(password)):[permissons],这种形式下的密码必须通过SHA1和BASE64进行双重加密
-
ip:限制只有特定IP的客户端才拥有指定的权限。通常写法为 ip:127.0.0.1:[permissions]
-
super:代表超级管理员,拥有所有的权限,通过启动参数
-Dzookeeper.DigestAuthenticationProvider.superDigest
设置用户名和密码
2.4 ZooKeeper使用场景
ZooKeeper使用场景如:集群管理、Master 选举、分布式锁和分布式队列等功能,如下介绍两种场景分布式锁和分布式原子操作
分布式锁: 基于临时节点的顺序性和监听机制实现,首次基于锁节点创建临时顺序节点时,由于锁节点下无临时节点,则该线程持有锁成功,其他线程抢占锁时,创建临时顺序节点,并查询该锁节点下的临时节点是否有大于该临时节点的节点,如果存在则添加离该临时节点最近的上一个节点添加监听事件,等待监听事件触发获取锁。锁的重入性由持有锁的应用内存实现
分布式原子操作: 基于节点的版本号实现,如果节点不存在则创建节点,之后操作时先查询节点数据,基于查询到的版本号更新该节点数据,更新失败,则查询节点数据重新计算新值基于新的版本号更新
2.5 服务启动流程
2.5.1 单机启动
-
统一由QuorumPeerMain作为启动类。无论是单机版还是集群模式启动服务,入口都是该类。
-
解析配置文件zoo.cfg。Zookeeper首先会进行配置文件的解析,配置文件的解析其实就是对zoo.cfg文件的解析。
-
创建并启动历史文件清理器DatadirCleanupManager。从3.4.0版本开始增加了自动清理历史数据文件的机制,包含对事务日志和快照数据文件进行定时清理
-
判断是集群模式还是单机模式启动
-
再次进行配置文件zoo.cfg的解析
-
创建服务器实例ZookeeperServer
-
创建服务器统计器ServerStats
-
创建Zookeeper数据管理器FileTxnSnapLog。该类是ZooKeeper上层服务器和底层数据存储之间的对接层,提供了一些了操作数据文件的接口,包括事务日志文件和快照数据文件。ZooKeeper根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog
-
设置服务器tickTime和会话超时时间限制
-
创建ServerCnxnFactory。从3.4.0引入Netty
-
初始化ServerCnxnFactory。首先会初始化一个Thread,作为整个ServerCnxnFactory的主线程,然后在初始化NIO服务器
-
启动ServerCnxnFactory主线程。默认暴漏2181端口,但还无法处理客户端请求
-
恢复本地数据。每次在ZooKeeper启动的时候,都需要从本地快照数据文件和事务日志文件中进行数据恢复
-
创建并启动会话管理器
SessionTracker
, 初始化ZooKeeper的请求处理链:PreRequestProcessorProcessor->SyncRequestProcessor->FinalRequestProcessor -
注册JMX服务
-
注册ZooKeeper服务器实例,对外提供服务。
2.5.2 集群启动
集群启动相比单机启动,启动入口相同,单机启动协调管理由ZookeeperServer
完成,集群启动协调管理由QuorumPeer
完成,QuorumPeer
是Quorum集群模式下特有的对象,是Zookeeper服务器实例 (ZooKeeperServer)的托管者,QuorumPeer代表了集群中的一台机器,QuorumPeer
做如下工作:
-
QuorumPeer会不断检测当前服务器实例的运行状态,同时根据情况发起
Leader选举
、选举完成后的数据同步、各个角色的处理链初始化【LeaderZooKeeperServer
初始化Leader的处理链,FollowerZooKeeperServer
初始化Follower的处理链,ObserverZooKeeperServer
初始化Observer的处理链】 -
将核心组件如FileTxnSnapLog、ServerCnxnFactory、ZKDatabase注册到QuorumPeer管理,同时配置QuorumPeer的参数,如服务器列表地址、Leader选举算法和会话超时时间限制等
三、Zookeeper之ZAB
3.1 什么是ZAB协议
Zab协议是一个分布式一致性算法,让ZK拥有了崩溃恢复和原子广播的能力,进而保证集群中的数据一致性
Zab协议分为四个核心步骤:
-
选举: 在ZooKeeper集群中选举中一个Leader
-
发现: Leader中会维护一个Follower的列表和Observer列表并与之通信
-
同步: Leader会把自己的数据同步给Follower和Observer, 做到多副本存储
-
广播: Leader接受Follower的事务Proposal,然后将这个事务性的proposal广播给其他learner
初始化Leader选举
Leader选举中具有投票的各服务节点通过QuorumCnxManager
管理与其他节点的通信,为了避免两台机器之间重复创建TCP连接,只允许SID大的服务器主动和其他服务器建立连接,否则断开连接;选举的核心处理由FastLeaderElection
完成
Leader选举的相关概念:
-
外部投票:特指其他服务器发送的投票
-
内部投票:服务器自身当前的投票
-
选票轮次:Zookeeper服务器Leader选举的轮次,即logicalclock
-
PK:指对内部投票和外部投票进行一个对比来确定是否需要变更内部投票,由
FastLeaderElection.totalOrderPredicate
完成 -
Quorum:过半机器数
FastLeaderElection
处理Leader选举的核心内容如下:
-
sendqueue:选票发送队列,用来保存待发送的选票
-
recvqueue:选票接收队列,用于保存接收到的外部投票
-
WorkerReceiver:选票管理器。该接收器会不断地从QuorumCnxManager中获取出其他服务器发来的选票消息,并将其转换成一个选票,然后保存到recvqueue队列中去。在选票的接收过程中,如果发现该外部投票的选举轮次小于当前服务器,则直接忽略这个外部投票,同时立即发出自己的内部投票。如果当前服务器状态不是LOOKING,即已经选举出Lader,那么也将忽略该消息。如果接收的消息来自Observer服务器,则忽略该消息,并将自己当前的投票发送出去。
-
WorkerSender:选票发送器,会不断的从sendqueue队列中获取待发送的选票,并将其传递到底层QuorumCnxManager中去。
QuorumCnxManager
处理Leader选举的核心内容如下:
-
SendWorker:消息发送器,对应一台远程的ZooKeeper服务器,负责消息的发送,
sendWorkerMap
维护与各个通信服务节点的通道,queueSendMap
维护与各个服务通信节点的待发送数据,lastMessageSent
维护各个服务通信节点的最后一条待发送数据 -
RecvWorker:消息接收器,不断从TCP连接中读取消息,并将其保存到recvQueue队列中
-
lookForLeader:该方法协调、统计、计算出最终的Leader节点
一致性机制
Leader和Learner启动期交互过程【数据同步及消息广播】
-
完成Leader选举之后,每个服务器都会根据自己的服务器角色创建相应的服务器实例,并开始进入各自角色的主流程。
-
Leader服务器启动Learner接收器LearnerCnxAcceptor。接收器用来负责接收所有非Leader服务器的连接请求。所有的Learner服务器在启动完毕后,会从Leader选举的投票结果中找到当前结集群中Leader服务器,然后与其建立来凝结。
-
Leader接收来自其他机器的连接创建请求后,会创建一个LearnerHander实例,每个LearnerHander实例对应一个Leader与Learner服务器之间的连接。负责leader和Learner服务器之间的消息通信和数据同步,下图为节点发现和数据同步
节点发现: Leader选举成功后,Leader启动Socket服务端,Learner节点向Leader建立Socket连接,Leader维护所有的节点连接
数据同步:各节点注册到Leader后,会将currentEpoch和lastZxid上送给Leader,Leader根据自身的epoch【当前轮次】
、minCommittedLog【提议缓存队列committedLog中最小ZXID】
和maxCommittedLog【提议缓存队列committedLog中最大ZXID】
处理各节点的同步方式:
-
DIFF: peerLastZxid介于minCommittedLog和maxCommittedLog之间,通知Learner差异同步
-
TRUNC:peerLastZxid大于maxCommittedLog,通知Learner回滚
-
SNAP:peerlastZxid小于minCommittedLog,通知Learner全量同步
-
UPTODATE:通知Learner服务器,已经完成了数据同步,可以开始对外提供服务
消息广播: Leader接受Learner转发的事务Proposal,然后将这个事务性的proposal广播给其他learner,ZooKeeper事务请求通常指的是创建节点、更新节点、删除节点以及创建会话等请求
消息广播的实现基于责任链模式,各个节点角色构建的处理链不同,非事务请求各节点可以直接处理,事务请求则同一转发给Leader处理
-
PrepRequestProcessor:Leader服务器的请求预处理器,该处理器能够识别当前客户端请求是否是事务请i去。如果是事务请求,该处理器会对其进行一系列的预处理,诸如创建请求事务头、事务体、会话检查、ACL检查和版本检测等
-
ProposalRequestProcessor:该处理器是Leader服务器的事务投票处理器,也是leader服务器事务处理流程的发起者。对于非事务请求,该类会直接将请求转给CommitProcessor处理器,不做其他处理;对于事务请求,处理将请求交给CommitProcessor处理外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器来发起一次集群内的事务投票。同时ProposalRequestProcessor还会将事务请求交给SyncRequestProcessor进行事务日志的记录。
-
SyncRequestProcessor:该处理器是事务日志记录处理器,主要用来将事务请求记录到事务日志文件中去,同时还会触发ZooKeeper进行数据快照。
-
AckRequestProcessor:该处理器是Leader特有的处理器。主要负责SyncRequestProcessor处理器完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。
-
CommitProcessor:是事务提交处理器。对于非事务请求,该处理器会直接将其交给下一个处理器进行处理;对于事务请求,该处理器会等待集群内针对Proposal的投票直到该Proposal可被提交。利用CommitProcessor处理器。每个服务器都可以很好地控制对事务请求的顺序处理。
-
ToBeCommitProcessor:该处理器有一个toBeApplied队列,专门用来存储那些已经被CommitProcessor处理过的可被提交的Proposal,该处理器将这些请求逐个交付给FinalRequestProcessor处理器进行处理,等到FinalRequestProcessor处理器完成之后,再将其从toBeApplied队列中移除。
-
FinalRequestProcessor:该处理器主要用来进行客户端请求返回之前的收尾工作,包括创建客户端请求的响应;针对事务请求,该处理器还会负责将事务应用到内存数据库中去。
Learner特有的处理器
-
FollowerRquestProcessor\ObserverRequestProcessor:识别如果是事务请求,将该事务请求转发给Leader服务器,Leader服务器再接收到这个事务请求后,将会将其提交到请求处理链,按照正常事务请求进行处理。
-
SendAckRequestProcessor:Follower事务日志记录反馈,在完成该事务日志记录后,会向Leader服务器发送ACK消息以表明自身完成了事务日志的记录工作。
报文结构
ZooKeeper使用jute序列化传输报文,报文的请求和响应格式如下:请求报文由三部分组成:请求报文长度 + 请求头 + 请求体 响应报文由三部分组成:响应报文长度 + 响应头 + 响应体
3.2 Watcher监听
监听节点类型:
-
None:连接建立事件
-
NodeCreated:节点创建
-
NodeDeleted:节点删除
-
NodeDataChanged:节点数据变化
-
NodeChildrenChanged:子节点列表变化
-
DataWatchRemoved:节点监听被移除
-
ChildWatchRemoved:子节点监听被移除
DataTree中会托管两个WatchManager,分别是dataWatches和childWatches,分别对应数据变更Watcher和子节点变更Watcher。
WatchManager是Zookeeper服务端的管理者,其内部两个维度对Watch存储:
watchTable:从数据节点路径的粒度管理Watcher watch2Paths:从Watcher的粒度来控制事件触发需要触发的数据节点
监听事件处理流程:在对指定节点进行数据操作后,调用WatchManager的triggerWatch()方法来触发相关的事件,无论是dataWatches还是childWatches管理器,如下:
服务端处理流程如下:
-
将通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)封装成一个WatchedEvent对象
-
根据数据节点的节点路径从watchTable中取出对应的Watcher。如果没找到,说明没有任何客户端在该数据节点上注册过Watcher,直接退出。如果找到了该Watcher,会将其提取出来,同时会直接从watchTable和watch2Paths中将其删除
-
逐个依次地调用找出的所有Watcher的process()方法,process()方法中,服务端会通过TCP连接向客户端发送一个WatcherEvent事件
客户端处理流程如下:
-
反序列化:将服务端响应的字节流封装为WatcherEvenert事件
-
处理chrootPath:如果服务端设置了chrootPath属性,需要对服务端传回的path路径进行处理,转为客户端的一个相对路径
-
还原WatchedEvent:将WatcherEvenert事件转换为WatchedEvent事件
-
回调Watcher:将WatchedEvent事件交给EventThread处理,EventThread是客户端专门处理服务端通知事件的线程
Watcher的特点
-
一次性:事件触发后,该事件不会再次触发,需要再次注册
-
轻量级:只包含通知状态、事件类型和节点路径,变更内容需要客户端主动获取
3.3 存储机制
启动时数据恢复
启动时调用ZKDataBase.loadDataBase
恢复数据并获取当前节点的lastProcessedZxid,内部由FileTxnSnapLog.restore
完成快照和事务日志的恢复到内存数据库DataTree
-
FileSnap.deserialize
从内存快照中恢复session和节点的大部分数据 -
fastForwardfromEdits
方法调用FilTxnLog
用于恢复部分未被快照的事务日志数据
运行时数据写入
运行时事务日志和内存快照写入由处理链的SyncRequestProcessor
完成,事务日志和内存快照文件的后缀为zxid
,集群中zxid
由两部分组成【高32位由epoch
,低32位由自增序列】,选出Leader后的zxid初始化由ZxidUtils.makeZxid
实现
-
事务日志: 每次事务操作都会写入事务日志,事务日志的写入性能直接决定了服务器对事务请求的响应,事务写入近似可以被看作是一个磁盘 I/O 的过程。严格地讲,文件的不断追加写入操作会触发底层磁盘 I/O 为文件开辟新的磁盘块,即磁盘 Seek 。因此,为了避免磁盘 Seek 的频率,提高磁盘 I/O 的效率, ZooKeeper 在创建事务日志的时候就会进行文件空间“预分配”——在文件创建之初就向操作系统预分配一个很大的磁盘块,默认是64MB, 而一旦已分配的文件空间不足4KB 时,那么将会再次“预分配”,以避免随着每次事务的写入过程中文件大小增长带 来的 Seek 开销,直至创建新的事务日志。事务日志“预分配”的大小可以通过系统属性
zookeeper.preAIlocSize
来进行设置 -
内存快照:每进行一次事务日志记录之后, 都会检测当前是否需要进行数据快照。理论上进行 snapCount 次事务操作后就会开始数据快照,但是考虑到数据快照对于 ZooKeeper 所在机器的整体性能的影响,需要尽量避免 ZooKeeper 集群中的所有机器在同一时刻进行数据快照。因此 ZooKeeper 在具体的实现中,并不是严格 地按照这个策略执行的,而是采取“过半随机”策略,即符合如下条件就进行数据快照:
(logCount > (snapCount / 2 + randRoll)) || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes /2 + randSize)
-
snapCount由
zookeeper.snapCunt
设置,默认值为100000,randRoll 为 0 ~snapCount/2 之间的随机数 -
snapSizeInBytes由
zookeeper.snapSizeLimitInKb
设置,默认值为4GB,randSize 为 0 ~snapSizeInBytes/2 之间的随机数
满足条件则异步线程创建内存快照
事务日志/内存快照查看[4]
//windows查看事务日志文件
java -cp .;slf4j-api-1.6.1.jar;zookeeper-3.4.6.jar org.apache.zookeeper.server.LogFormatter 日志文件的路径
//linux查看内存快照文件
java -cp .:slf4j-api-1.6.1.jar:zookeeper-3.4.6.jar org.apache.zookeeper.server.SnapshotFormatter 快照文件的路径
//新版本事务日志使用的类为org.apache.zookeeper.server.persistence.TxnLogToolkit
//zkTxnLogToolkit.sh查看事务日志
//zkSnapShotToolkit.sh查看内存快照
3.4 会话机制
ZooKeeper会话:客户端与服务端交互建立在会话创建完成后,临时节点的生命周期、Watcher机制、事务请求都建立在会话之上。
会话载体:ZooKeeper的会话载体为Session,包含如下部分
-
sessionId:唯一标识一个会话,每一次客户端建立连接的时候,Zookeeper服务端都会给其分配一个全局唯一的sessionId。保证全局唯一的方式是每次启动时初始化sessionId为【机器码(myid)+ 当前时间戳】,之后每次创建会话在该初始化的sessionId依次递增
sessionId生成算法如下:
sessionTrackerImpl.initializenextSession方法
1.获取当前时间的毫秒表示,先左移24位,之后无符号右移8位
nextSid = (System.currentTimeMillis() << 24) >>> 8;
2.将1
计算的值和机器码myid左移56位取或
nextSid = nextSid | (id << 56);
-
timeout:会话的超时时间
-
isClosing:用来标记当前会话是否已经处于被关闭的状态。如果服务端检测到当前会话的超时时间已经到了,就会将isCloseing属性标记为已经关闭,这样以后即使再有这个会话的请求访问也不会被处理
-
owner:会话拥有者,ZooKeeper服务器在收到客户端的请求后会首先检查session的owner,如果owner不是当前服务器的话,会给客户端返回SessionMovedException
SessionTracker: SessionTracker是Zookeeper中的会话管理器,负责整个zk生命周期中会话的创建、管理和清理操作
会话创建: 客户端首次与服务端建立连接时,服务端创建sessionId,并在SessionTracker注册
会话激活: 客户端向服务端发送业务请求或PING请求,服务端在接受到了客户端的请求后激活会话。如下客户端两种会激活会话:
-
当客户端向服务端发送请求的时候,包括读写请求,都会主动触发一次会话激活
-
如果客户端在sessionTimeOut / 3时间范围内尚未和服务器之间进行通信,即没有发送任何请求,就会主动发起一个PING请求,去触发服务端的会话激活操作
服务端处理场景如下:
-
检验会话是否已经被关闭,Leader会去检查会话是否被关闭,如果已经关闭,不会再去激活该会话
-
如果会话没有被关闭,则开始计算下一次的超时时间Expiration_New,计算完新的超时时间以后,将原区块中的会话进行迁移,放入新的Expiration_New对应的区块中
会话重连: 在Zookeeper运行过程中,也可能会出现会话断开后重连的情况,这个时候客户端会从清洗后的连接列表中按照顺序的方式重新建立连接,直到连接上其中一台机器为止,超时时间内连接上服务端会话状态为CONNECTED,超时后才连接上服务端的,会话状态则为EXPIRED
而在重连之前,可能因为其他原因导致的断开连接,即CONNECTION_LESS,会抛出异常org.apache.zookeeper.KeeperException$ConnectionLossException,我们需要捕获异常,并且在重连成功后,收到None-SyncConnection通知里面进行setData的处理操作即可。在这个过程中,会话可能会出现两种情况:会话失效【SESSION_EXPIRED】、会话转移【SESSION_MOVED】
会话清理:会话检查操作以后,当发现有超时的会话的时候,会进行会话清理操作,而Zookeeper中的会话清理操作,主要是以下几个步骤:
1.为了保证在清理的过程中,该会话不会再去接受和处理发来的请求,因此,在会话检查完毕后,SessionTracker会先将其会话的isClose标记为true,接着为了保证在进行会话关闭的过程中,在整个集群中都生效,Zookeeper使用了提交的方式,交给PreRequestProcessor处理器进行处理
2.在某个会话失效后,这个会话创建的相关临时节点列表都应该被删除,当会话对应的临时节点列表找到后,Zookeeper会将列表中所有的节点变成删除节点的请求,并且丢给事物变更队列OutStandingChanges中,接着FinalRequestProcessor处理器会触发删除节点的操作,从内存数据库中删除。
3.当会话对应的临时节点被删除以后,将会话从SessionTracker中移除了,主要从SessionById,sessionsWithTimeOut以及sessionsSets中将会话移除掉,最后关闭当前会话的连接NioServerCnxn
会话管理: Zookeeper中的会话管理主要是SesssionTracker负责的,内部使用了一个特殊的机制,称之为分桶策略,所谓分桶策略,其实是将类似的会话放在一个区块中进行管理,以便于zookeeper对会话进行不同区块的隔离以及同一区块的统一处理
由于会话之间的激活是按照分桶策略进行保存的,因此我们可以利用此策略优化对于会话的超时检查,在Zookeeper中,会话超时检查也是由SessionTracker负责的,内部有一个线程专门进行会话的超时检查,只要依次的对每一个区块的会话进行检查,由于分桶是按照ExpriationInterval 的倍数来进行会话分布的,因此只要在这些时间点检查即可,这样可以减少检查的次数,并且批量清理会话,实现较高的效率
四、Zookeeper运维及配置
4.1 参数配置
ZooKeeper参数配置分两种:
-
配置文件:配置文件默认读取文件路径为
$ZOOKEEPER_HOME/conf/zoo.cfg
,配置文件的解析通过QuorumPeerConfig
,如代码中无对应的参数则设置到环境变量System.setProperty("zookeeper." + key,value)
,即可通过配置文件方式设置环境变量 -
系统变量:设置Java系统属性可以在启动时后面加
-D
参数,比如:-Dzookeeper.snapCount=xxx。ZooKeeperServer
类有使用系统参数的场景。
参数配置如下:
-
dataDir
:存储内存数据库快照文件的路径 -
dataLogDir
:存储事务日志文件的路径,如果未设置则为dataDir
设置的文件路径,ZooKeeper 在响应客户端事务请求之前,需要将请求的事务日志写到磁盘上,建议给事务日志的输出配置一个单独的磁盘或者挂载点,并和快照文件路径分开 -
clientPort
:服务器监听客户端连接的端口 -
clientPortAddress
:服务器监听客户端连接的地址 -
tickTime
:单个tick的时间长度,它是ZooKeeper中使用的基本时间单元,以毫秒为单位。它用来调节心跳和超时时间 -
maxClientCnxns
:在socket级别限制单个客户端到ZooKeeper集群中单台服务器的并发连接数量,可以通过IP地址来区分不同的客户端。它用来阻止某种类型的DoS攻击,包括文件描述符资源耗尽。默认值是60。将值设置为0将完全移除并发连接的限制。 -
minSessionTimeout
:服务器允许客户端会话的最小超时时间,以毫秒为单位。默认值是2倍的tickTime -
maxSessionTimeout
:服务器允许客户端会话的最大超时时间,以毫秒为单位。默认值是20倍的tickTime -
initLimit
:默认配置文件配置是10,即tickTime属性值的10倍。它用于配置允许Followers连接并同步到Leader的最大时间。如果ZooKeeper管理的数据量很大的话可以增加这个值 -
syncLimit
:默认配置文件配置是5,即tickTime属性值的5倍。它用于配置Leader和Followers间进行心跳检测的最大延迟时间。如果在设置的时间内Followers无法与Leader进行通信,那么Followers将会被丢弃 -
electionAlg
:用于选择使用的Leader选举算法。-
0
对应于原始的基于UDP的版本 -
1
对应于快速Leader选举基于UDP的无身份验证的版本 -
2
对应于快速Leader选举基于UDP的身份验证的版本 -
3
对应于快速Leader选举基于TCP的版本。目前默认值是算法3。
-
-
peerType
:设置服务器角色,observer
和participant
-
autopurge.snapRetainCount
:当启用自动清理功能后,保留快照文件和事务日志文件数量。默认值时3 -
autopurge.purgeInterval
:用于配置触发清理任务的时间间隔,以小时为单位。默认值为0,不启动清理 -
server.*:host:port:port
:组成ZooKeeper服务集群的节点
//集群信息(服务器编号,服务器地址,LF通信端口,选举端口,服务器角色)
server.0=127.0.0.1:2182:2183:observer
//默认服务器角色为participant
server.1=127.0.0.1:2184:2185
//服务器编号myid,在服务器选主过程中,如果epoch【纪元,每次选举后+1】、zxid【事务编号】,都相同,则myid大的获取选票
-
group
:服务节点分组,如分成三个组,如果两个组正常,则服务集群可正常提供服务,组必须是不相交的,并且所有组联合后必须是 ZooKeeper 集群,结合weight
使用 -
weight.*
:设置服务器的权重,设置为0
则不参与投票
4.2 服务端监控
四字命令
四字命令是指四个字母缩写的指令,首先需要先开启四字命令
//开启所有
4lw.commands.whitelist=*
//开启指定的命令
4lw.commands.whitelist=stat, ruok, conf, isro
可以使用telnet
和nc两种方式
echo ruok | nc 127.0.0.1 5111
JMX监控
JMX(Java Management Extensions,即Java管理扩展)是一个为应用程序、设备、系统等植入管理功能的框架,JMX配置如下:
-Dcom.sun.management.jmxremote //JMX代理监听开关
-Djava.rmi.server.hostname=$JMXHOSTNAME //服务端的IP
-Dcom.sun.management.jmxremote.port=$JMXPORT //JMX代理监听端口
-Dcom.sun.management.jmxremote.authenticate=$JMXAUTH //是否开启密码认证
-Dcom.sun.management.jmxremote.ssl=$JMXSSL 是否使用SSL通讯
linux下启动远程JMX在zkEnv.sh
添加如下参数:
JMXHOSTNAME=服务器IP
JMXPORT=JMX代理监听端口
添加如上两个参数,默认不开启密码认证、不使用SSL通讯,监听的指标数据为ZKMBeanInfo
的实现类,即内存数据库、节点信息等
Prometheus集成
//zoo.cfg配置如下开启Prometheus MetricsProvider
metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
//可选项,默认为7000
metricsProvider.httpPort=7000
ZooKeeper自3.6.0版本支持数据指标采集,从grafana官网下载ZooKeeper的Dashboard模板集成
4.3 客户端启动流程
-
初始化Zookeeper对象。通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager
-
设置会话默认Watcher。如果在构造方法中传入一个Watcher对象,那么客户端会将这个对象作为默认Watcher对象保存在ClientWatchManager中
-
构造Zookeeper服务器地址列表管理器:HostProvider。对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中
-
创建并初始化客户端网络连接器:ClientCnxn。Zookeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务端的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列
-
初始化SendThread和EventThread。客户端会创建两个核心网络线程SendThread和EventThread,前者用来管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件初始。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理时间队列waitingEvents,用于存放所有等待被客户端处理的事件
-
启动SendThread和EventThread。SendThread首先会判断当前客户端的状态,进行一系列的清理性工作,为客户端发送"会话创建"请求做准备
-
获取一个服务器地址。在开始创建TCP连接之前,SendThread首先需要获取一个Zookeeper服务器的目标地址,这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP长连接
-
创建TCP长连接。获取到一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接
-
构造ConnectRequest请求。SendThread负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试图与服务器创建一个会话。同时,Zookeeper客户端还会进一步将请求包装成网络I\O层的Package对象,放入请求发送队列outgoingQueue
-
发送请求。当客户端请求准备发送完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Package对象,将其序列化成ByteBuffer后向服务端发送请求
-
接收服务端响应。ClientCnxnSocket接收到服务端的响应后,首先判断当前客户端的状态是否是"已初始化",如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交给readConnectResult方法来处理该响应
-
处理Response。ClientCnxnSocket会对接收到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到Zookeeper服务端分配的会话sessionId
-
连接成功。连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包含readTimeout和connectTimeout等,并更新客户端状态:另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址
-
生成事件:SyncConnected-None。为了能够让上层应用知道该会话的成功创建,SendThread会生成一个事件Syncconnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程
-
查询Watcher。EventThread线程收到事件后,会从ClientWatcherManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出步骤2存储的默认Watcher,然后将其放到EventThread的waitingEvents队列中去
-
处理事件。EventThread不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的
4.4 ZooKeeper客户端
Zookeeper原生客户端
该客户端为官方自带,该客户端与服务端建立是异步的过程,创建客户端后不能直接使用,需要结合CountDownLatch
等待监听CONNECTED
后处理业务;会话超时异常时,不支持自动重连;watcher是一次性的;不支持递归创建和删除节点;
引入jar包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>版本号</version>
</dependency>
创建ZooKeeper连接
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zookeeper = new ZooKeeper(connectAddress, sessionTimeout, watchedEvent -> {
//获取监听事件的状态
Watcher.Event.KeeperState state = watchedEvent.getState();
//获取监听事件类型
Watcher.Event.EventType type = watchedEvent.getType();
//已经建立上了连接
if (Watcher.Event.KeeperState.SyncConnected == state) {
if (Watcher.Event.EventType.None == type) {
// ZooKeeper连接成功
countDownLatch.countDown();
}
}
});
//等待ZooKeeper watch通知
countDownLatch.await();
Zookeeper基础操作
//判断节点是否存在,并对该节点启动默认监听事件
zookeeper.exists(nodePath, true);
//创建持久节点,并设置对应的数据,创建节点是需要先判断该节点是否存在,否则报错
zookeeper.create(nodePath, nodeData.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//更新节点数据,-1代表不指定版本
Stat stat = zookeeper.setData(nodePath, data.getBytes(), -1);
//查询节点数据,查询该节点的数据和状态信息赋值到Stat对象
Stat stat = new Stat();
byte[] data = zookeeper.getData(nodePath, true, stat);
//删除节点数据,-1表示删除节点时,不指定版本
zookeeper.delete(nodePath, -1);
4.5 Curator
Curator是Netflix公司在原生zookeeper基础上开源的一个ZooKeeper Java客户端,现在是Apache下的一个开源项目。Curator解决了原生客户端存在的一系列问题,并提供了Fluent风格的操作API,针对业务场景,提供了如分布式锁、集群领导选举、分布式计数器、缓存机制、分布式队列等工具类
引入jar包
<!-- curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>版本号</version>
</dependency>
<!-- curator-recipes,业务场景解决方案 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>版本号</version>
</dependency>
Curator创建ZooKeeper连接
//创建方式一:Fluent方式
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDRESS)//ZK服务端地址
.sessionTimeoutMs(6000)//会话超时时间
.connectionTimeoutMs(15000)//连接超时事件
.retryPolicy(retry) //重试策略RetryPolicy
.build();
//创建方式二:
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, retry);
//创建完成后启动客户端
client.start();
Curator基础操作
//判断节点是否存在,构建ExistsBuilder处理,exists不为空则存在该节点
Stat exists = client.checkExists().forPath(nodePath);
//创建节点,构建CreateBuilder处理,创建节点需先判断该节点是否存在
client.create()
.creatingParentsIfNeeded()//父节点不存在,则创建
.withMode(CreateMode.EPHEMERAL)//创建节点类型,默认持久节点
.forPath(nodePath);
//更新节点,构建SetDataBuilder处理
client.setData()
.withVersion(-1)//指定版本,默认为-1
.inBackground()//异步执行
.forPath(nodePath, nodeData.getBytes());
//查询节点数据,构建GetDataBuilder处理
Stat stat = null;
byte[] bytes = client.getData()
.storingStatIn(stat)//查询节点状态信息设置到Stat对象
.forPath(nodePath);
//查询子节点,构建GetChildrenBuilder处理
List<String> childNode = client.getChildren().forPath(nodePath);
//删除节点,构建DeleteBuilder处理
client.delete()
.deletingChildrenIfNeeded()//级联删除子节点
.forPath(nodePath);
Curator监听操作
Curator提供了三种监听方式:
-
NodeCache:监听指定的节点
-
PathChildrenCache:监听指定节点的子节点
-
TreeCache:监听指定节点和子节点及其子孙节点
//获取监听对象NodeCache
NodeCache nodeCache = new NodeCache(client, "/nodeCache");
//添加监听事件,新增、修改时触发
nodeCache.getListenable().addListener(()->{
//获取当前节点变更后的数据
nodeCache.getCurrentData().getData();
});
//启动监听,false则启动后默认不创建节点
nodeCache.start(false);
=================================================
//获取监听对象PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/childrenCache","true");
//添加监听事件,子节点新增、修改、删除时触发
pathChildrenCache.getListenable().addListener((CuratorFramework client, PathChildrenCacheEvent event) -> {
//获取子节点的改变类型
PathChildrenCacheEvent.Type type = event.getType();
//变更数据的路径和变更后的数据
ChildData childData = event.getData();
});
//启动监听事件
pathChildrenCache.start();
=================================================
//获取监听对象TreeCache
TreeCache treeCache = new TreeCache(client, "/treeCache");
//添加监听器,当前节点、子节点、孙节点的新增、修改和删除时触发
treeCache.getListenable().addListener((CuratorFramework client, TreeCacheEvent event) -> {
//获取节点的改变类型
TreeCacheEvent.Type type = event.getType();
//变更数据的路径和变更后的数据
ChildData childData = event.getData();
});
treeCache.start();