当前位置: 首页 > article >正文

【Zookeeper源码走读】第一章 客户端与服务器的连接过程

前言

客户端使用的是curator 版本:5.3.0,zookeeper版本:3.6.3

一、客户端创建client对象并启动连接

CuratorFramework client = CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);

client.start();

使用curator封装的api,创建client就很简单方便。client对象创建成功之后,再调用start()方法,即完成了客户端连接服务器的过程。接下来,我们深入代码,看看这两行代码内部都做了哪些事情。

二、走读curator源码

以下是newClient方法的完整代码:

    /**
     * Create a new client
     *
     * @param connectString       list of servers to connect to
     * @param sessionTimeoutMs    session timeout
     * @param connectionTimeoutMs connection timeout
     * @param retryPolicy         retry policy to use
     * @return client
     */
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
    {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }

查看builder()的实现:

    public static Builder builder()
    {
        return new Builder();
    }

bulder()方法中,返回一个Builder对象。跟踪Builder类,其构造函数为空,但Builder类的成员变量初始化了很多其他类,现将与连接有关的类摘取如下:

    // DEFAULT_ZOOKEEPER_FACTORY定义为常量
    private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();

    // 根据上面的定义,这里的 ZookeeperFactory 使用的是DefaultZookeeperFactory类
    private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;

builder()方法执行完了,后面就是设置连接的服务器地址connectString(),超时时间sessionTimeoutMs()&connectionTimeoutMs(),重试机制retryPolicy(),最后调用build()方法,接下来就跟踪这个方法:

    public CuratorFramework build()
    {
        return new CuratorFrameworkImpl(this);
    }

build()方法返回一个新的CuratorFrameworkImpl对象,并将上面初始话的builder对象作为构造函数的参数传入。关键代码如下:

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                builder.getWaitForShutdownTimeoutMs(),
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly()
            );
    
        ......

    }

构造函数的第一行调用makeZookeeperFactory()方法,返回一个ZookeeperFactory对象,看下该方法的内部实现:

    private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory, ZKClientConfig zkClientConfig)
        {
            return new ZookeeperFactory()
            {
                @Override
                public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
                {
                    ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, zkClientConfig);
                    addAuthInfos(zooKeeper);
                    return zooKeeper;
                }
            };
        }

传入makeZookeeperFactory()的第一个参数是:builder.getZookeeperFactory(),前面已经提到,初始化Builder的成员变量时,zookeeperFactory被赋值给常量DefaultZookeeperFactory。因此,从上面的代码可知,ZookeeperFactory的作用就是创建一个Zookeeper对象,该对象是由DefaultZookeeperFactory类中newZooKeeper()方法生成的。

接下来跟踪CuratorFrameworkImpl构造函数的第二行代码,即创建CuratorZookeeperClient对象:

this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                builder.getWaitForShutdownTimeoutMs(),
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly()
            );

CuratorZookeeperClient的构造函数如下:

    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
            int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
            RetryPolicy retryPolicy, boolean canBeReadOnly)
    {
        if ( sessionTimeoutMs < connectionTimeoutMs )
        {
            log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }

        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");

        this.connectionTimeoutMs = connectionTimeoutMs;
        this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
        
        // zookeeperFactory是用来创建zookeeper对象的,该参数被传入了ConnectionState方法
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);
        setRetryPolicy(retryPolicy);
    }

关注上面的带注释那行代码,因此我们继续跟踪ConnectionState类的代码:

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
    {
        this.ensembleProvider = ensembleProvider;
        this.tracer = tracer;
        if ( parentWatcher != null )
        {
            parentWatchers.offer(parentWatcher);
        }
        
        handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }

ConnectionState的构造函数中,除了赋值,还初始化了HandleHolder类,HandleHolder构造函数仅仅初始化变量,没有其他业务,无需继续跟踪。

至此,client = CuratorFrameworkFactory.newClient()方法的初始化流程已经全部走完。

接下来看看 client.start() 的实现中是如何调用连接过程的,start()方法是在 CuratorFrameworkImpl 中实现的,代码如下:

public void start() {
        log.info("Starting");
        ......  
        // 重点关注这行代码,这里的client是CuratorZookeeperClient的对象
        client.start();
        ...... 
        }
    }

start()方法里面,又调用了CuratorZookeeperClient类里面的start()方法,代码如下:

    /**
     * Must be called after construction
     *
     * @throws IOException errors
     */
    public void start() throws Exception
    {
        log.debug("Starting");

        if ( !started.compareAndSet(false, true) )
        {
            throw new IllegalStateException("Already started");
        }

        // // state是ConnectionState的对象
        state.start();
    }

由前面初始化的流程可知,ConnectionState是在初始化CuratorZookeeperClient的构造函数中被初始化的,进入ConnectionState的start方法,代码如下:

    void start() throws Exception
    {
        log.debug("Starting");
        ensembleProvider.start();
        reset();
    }

查看reset()的实现:

    synchronized void reset() throws Exception
    {
        log.debug("reset");

        instanceIndex.incrementAndGet();

        isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        handleHolder.closeAndReset();
        handleHolder.getZooKeeper();   // initiate connection
    }

源码中的注释,initiate connection 说明连接是在handleHolder.getZooKeeper()方法中实现的,查看其源码验证是否正确:

    ZooKeeper getZooKeeper() throws Exception
    {
        return (helper != null) ? helper.getZooKeeper() : null;
    }

从上面的代码可知,getZooKeeper()方法应该返回helper.getZooKeeper()的值,而helper的初始化是在helper.getZooKeeper()的前一句代码handleHolder.closeAndReset()中初始化的,helper.getZooKeeper()的源码如下:

void closeAndReset() throws Exception
    {
        internalClose(0);

        Helper.Data data = new Helper.Data();   // data shared between initial Helper and the un-synchronized Helper
        // first helper is synchronized when getZooKeeper is called. Subsequent calls
        // are not synchronized.
        //noinspection NonAtomicOperationOnVolatileField

        // helper的初始化入口
        helper = new Helper(data)
        {
            @Override
            ZooKeeper getZooKeeper() throws Exception
            {
                synchronized(this)
                {
                    if ( data.zooKeeperHandle == null )
                    {
                        resetConnectionString(ensembleProvider.getConnectionString());
                        // zookeeperFactory的值是前面提到的DefaultZookeeperFactory对象
                        data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }

                    helper = new Helper(data);

                    return super.getZooKeeper();
                }
            }
        };
    }

上面的代码有一点点绕,不过其最终调用的是DefaultZookeeperFactory的newZooKeeper()方法,代码如下:

public class DefaultZookeeperFactory implements ZookeeperFactory
{
    @Override
    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
    {
        return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
    }
}

newZookeeper()方法,又调用了ZookeeperAdmin,ZookeeperAdmin是在zookeeper中实现的,不在curator中,因此源码见下一节的内容。

三、走读Zookeeper源码

    public ZooKeeperAdmin(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        boolean canBeReadOnly) throws IOException {
        super(connectString, sessionTimeout, watcher, canBeReadOnly);
    }

ZookeeperAdmin是Zookeeper的子类,其最终调用的是Zookeeper的如下构造方法:

public ZooKeeper(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        boolean canBeReadOnly,
        HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {
        LOG.info(
            "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
            connectString,
            sessionTimeout,
            watcher);

        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        hostProvider = aHostProvider;

        cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);

         // 创建连接的关键代码就是这里
        cnxn.start();
    }

Zookeeper类的构造方法的最后一行代码:cnxn.start(),就是创建连接的入口,cnxn是ClientCnxn的对象,ClientCnxn.start()方法的源码如下:

    public void start() {
        sendThread.start();
        eventThread.start();
    }

上述两行代码就是启动两个线程,而在SendThread线程类的run方法里面,涉及到连接的过程:

public void run() {
            ......

            while (state.isAlive()) {
                try {

                    // // 这部分就是判断当前是否已和服务器连接,如果未连接,则开始连接
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }
                        onConnecting(serverAddress);
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

                    if (state.isConnected()) {
                    }
                        
            }
            ......       
           
        }

上面的代码,未提及如何组装连接对象以及如何发送socket的部分,因为不是本文的重点,所以就不展开描述了。至此,客户端连接服务器的流程算是完成了。


总结

以上就是客户端与服务器大致的连接过程,下文开始走读心跳,超时等机制。


http://www.kler.cn/a/16261.html

相关文章:

  • 3. Sharding-Jdbc核⼼流 程+多种分⽚策略
  • unity 一个物体随键盘上下左右旋转和前进的脚本
  • Dolby TrueHD和Dolby Digital Plus (E-AC-3)编码介绍
  • 国家网络安全法律法规
  • git下载慢下载不了?Git国内国外下载地址镜像,git安装视频教程
  • Linux基础1
  • 麓言信息设计创意思维,打开设计师思路
  • 智慧物流信息系统开发需具备哪些功能?
  • 2023北京老博会(中国国际老年产业博览会)展位预订迎高峰
  • 鸿蒙系统是什么?鸿蒙与开源鸿蒙的关系?鸿蒙系统的发展历程
  • H2O生成——屏障
  • 论文笔记:Model-Contrastive Federated Learning
  • TPM-TPM-Profile-PTP协议-2
  • Vue3事件绑定
  • 【五一创作】50道Java面试题
  • Python的一些知识
  • 做了一年csgo搬砖项目,还清所有债务:会赚钱的人都在做这件事 !
  • 更轻更好用的蓝牙耳机,日常佩戴更舒适,QCY Crossky Link体验
  • Nginx:常见的面试题和答案
  • Delphi 内存分配
  • Java程序猿搬砖笔记(十二)
  • 记录和传播知识的重要性不亚于创造知识本身【专利所保护的,主要是`流程`、`工艺`和`方法`,不是一个具体的产品。】
  • 与贵州公安面对面|欧科云链天眼中国行,他们都说“行”
  • 点亮第一个LED灯
  • vue3+element-plus角色权限管理分配
  • 使用cube studio开发机器学习建模的pipeline