【Zookeeper源码走读】第一章 客户端与服务器的连接过程
前言
客户端使用的是curator 版本:5.3.0,zookeeper版本:3.6.3
一、客户端创建client对象并启动连接
CuratorFramework client = CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
client.start();
使用curator封装的api,创建client就很简单方便。client对象创建成功之后,再调用start()方法,即完成了客户端连接服务器的过程。接下来,我们深入代码,看看这两行代码内部都做了哪些事情。
二、走读curator源码
以下是newClient方法的完整代码:
/**
* Create a new client
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy retry policy to use
* @return client
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
查看builder()的实现:
public static Builder builder()
{
return new Builder();
}
bulder()方法中,返回一个Builder对象。跟踪Builder类,其构造函数为空,但Builder类的成员变量初始化了很多其他类,现将与连接有关的类摘取如下:
// DEFAULT_ZOOKEEPER_FACTORY定义为常量
private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
// 根据上面的定义,这里的 ZookeeperFactory 使用的是DefaultZookeeperFactory类
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
builder()方法执行完了,后面就是设置连接的服务器地址connectString(),超时时间sessionTimeoutMs()&connectionTimeoutMs(),重试机制retryPolicy(),最后调用build()方法,接下来就跟踪这个方法:
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
build()方法返回一个新的CuratorFrameworkImpl对象,并将上面初始话的builder对象作为构造函数的参数传入。关键代码如下:
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly()
);
......
}
构造函数的第一行调用makeZookeeperFactory()方法,返回一个ZookeeperFactory对象,看下该方法的内部实现:
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory, ZKClientConfig zkClientConfig)
{
return new ZookeeperFactory()
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, zkClientConfig);
addAuthInfos(zooKeeper);
return zooKeeper;
}
};
}
传入makeZookeeperFactory()的第一个参数是:builder.getZookeeperFactory(),前面已经提到,初始化Builder的成员变量时,zookeeperFactory被赋值给常量DefaultZookeeperFactory。因此,从上面的代码可知,ZookeeperFactory的作用就是创建一个Zookeeper对象,该对象是由DefaultZookeeperFactory类中newZooKeeper()方法生成的。
接下来跟踪CuratorFrameworkImpl构造函数的第二行代码,即创建CuratorZookeeperClient对象:
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly()
);
CuratorZookeeperClient的构造函数如下:
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
RetryPolicy retryPolicy, boolean canBeReadOnly)
{
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
// zookeeperFactory是用来创建zookeeper对象的,该参数被传入了ConnectionState方法
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}
关注上面的带注释那行代码,因此我们继续跟踪ConnectionState类的代码:
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.tracer = tracer;
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
}
handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
ConnectionState的构造函数中,除了赋值,还初始化了HandleHolder类,HandleHolder构造函数仅仅初始化变量,没有其他业务,无需继续跟踪。
至此,client = CuratorFrameworkFactory.newClient()方法的初始化流程已经全部走完。
接下来看看 client.start() 的实现中是如何调用连接过程的,start()方法是在 CuratorFrameworkImpl 中实现的,代码如下:
public void start() {
log.info("Starting");
......
// 重点关注这行代码,这里的client是CuratorZookeeperClient的对象
client.start();
......
}
}
start()方法里面,又调用了CuratorZookeeperClient类里面的start()方法,代码如下:
/**
* Must be called after construction
*
* @throws IOException errors
*/
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}
// // state是ConnectionState的对象
state.start();
}
由前面初始化的流程可知,ConnectionState是在初始化CuratorZookeeperClient的构造函数中被初始化的,进入ConnectionState的start方法,代码如下:
void start() throws Exception
{
log.debug("Starting");
ensembleProvider.start();
reset();
}
查看reset()的实现:
synchronized void reset() throws Exception
{
log.debug("reset");
instanceIndex.incrementAndGet();
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
handleHolder.closeAndReset();
handleHolder.getZooKeeper(); // initiate connection
}
源码中的注释,initiate connection 说明连接是在handleHolder.getZooKeeper()方法中实现的,查看其源码验证是否正确:
ZooKeeper getZooKeeper() throws Exception
{
return (helper != null) ? helper.getZooKeeper() : null;
}
从上面的代码可知,getZooKeeper()方法应该返回helper.getZooKeeper()的值,而helper的初始化是在helper.getZooKeeper()的前一句代码handleHolder.closeAndReset()中初始化的,helper.getZooKeeper()的源码如下:
void closeAndReset() throws Exception
{
internalClose(0);
Helper.Data data = new Helper.Data(); // data shared between initial Helper and the un-synchronized Helper
// first helper is synchronized when getZooKeeper is called. Subsequent calls
// are not synchronized.
//noinspection NonAtomicOperationOnVolatileField
// helper的初始化入口
helper = new Helper(data)
{
@Override
ZooKeeper getZooKeeper() throws Exception
{
synchronized(this)
{
if ( data.zooKeeperHandle == null )
{
resetConnectionString(ensembleProvider.getConnectionString());
// zookeeperFactory的值是前面提到的DefaultZookeeperFactory对象
data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);
}
helper = new Helper(data);
return super.getZooKeeper();
}
}
};
}
上面的代码有一点点绕,不过其最终调用的是DefaultZookeeperFactory的newZooKeeper()方法,代码如下:
public class DefaultZookeeperFactory implements ZookeeperFactory
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
newZookeeper()方法,又调用了ZookeeperAdmin,ZookeeperAdmin是在zookeeper中实现的,不在curator中,因此源码见下一节的内容。
三、走读Zookeeper源码
public ZooKeeperAdmin(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly) throws IOException {
super(connectString, sessionTimeout, watcher, canBeReadOnly);
}
ZookeeperAdmin是Zookeeper的子类,其最终调用的是Zookeeper的如下构造方法:
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
// 创建连接的关键代码就是这里
cnxn.start();
}
Zookeeper类的构造方法的最后一行代码:cnxn.start(),就是创建连接的入口,cnxn是ClientCnxn的对象,ClientCnxn.start()方法的源码如下:
public void start() {
sendThread.start();
eventThread.start();
}
上述两行代码就是启动两个线程,而在SendThread线程类的run方法里面,涉及到连接的过程:
public void run() {
......
while (state.isAlive()) {
try {
// // 这部分就是判断当前是否已和服务器连接,如果未连接,则开始连接
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
onConnecting(serverAddress);
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
}
}
......
}
上面的代码,未提及如何组装连接对象以及如何发送socket的部分,因为不是本文的重点,所以就不展开描述了。至此,客户端连接服务器的流程算是完成了。
总结
以上就是客户端与服务器大致的连接过程,下文开始走读心跳,超时等机制。