当前位置: 首页 > article >正文

【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程

系列文章目录

【zookeeper核心源码解析】第一课:zk启动类核心流程序列图
【zookeeper核心源码解析】第二课:俯瞰QuorumPeer启动核心流程,实现选举关键流程
【zookeeper核心源码解析】第三课:leader与follower何时开始同步,如何同步数据
【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程

【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程

  • 系列文章目录
  • 1. 先看服务端初始化与连接构建的准备
  • 2. 客户端代码


1. 先看服务端初始化与连接构建的准备

在第一节中,介绍到NIOServerCnxnFactory的初始化,该类其实就是专门为客户端读写数据准备的服务端。主要构建连接与数据读写。

c class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable

在run方法中构建连接与io读写,具体代码如下:



    public void run() {
        while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                        selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k
                                .channel()).accept();
                        InetAddress ia = sc.socket().getInetAddress();
                        int cnxncount = getClientCnxnCount(ia);
                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                            LOG.warn("Too many connections from " + ia
                                     + " - max is " + maxClientCnxns );
                            sc.close();
                        } else {
                            LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
                        }
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                      + k.readyOps());
                        }
                    }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
        }
        closeAll();
        LOG.info("NIOServerCnxn factory exited run method");
    }

2. 客户端代码

ClientCnxn 类是客户端的入口代码。

/**
 * This class manages the socket i/o for the client. ClientCnxn maintains a list
 * of available servers to connect to and "transparently" switches servers it is
 * connected to as needed.
 *
 */

里面的EventThread专本对数据进行异步读写。感兴趣可以从run()方法进去看

       @Override
       public void run() {
          try {
             isRunning = true;
             while (true) {
                Object event = waitingEvents.take();
                if (event == eventOfDeath) {
                   wasKilled = true;
                } else {
                   processEvent(event);
                }
                if (wasKilled)
                   synchronized (waitingEvents) {
                      if (waitingEvents.isEmpty()) {
                         isRunning = false;
                         break;
                      }
                   }
             }
          } catch (InterruptedException e) {
             LOG.error("Event thread exiting due to interruption", e);
          }

           LOG.info("EventThread shut down");
       }

http://www.kler.cn/a/456821.html

相关文章:

  • 单元测试入门和mockup
  • Kali Linux系统上配置Git的全局代理
  • 2024.12.30(多点通信)
  • 多分类的损失函数
  • 自动化文档处理:Azure AI Document Intelligence
  • Flink源码解析之:如何根据算法生成StreamGraph过程
  • 在STM32F103xx performance line block diagram找不到某一个外设所挂载在那条总线怎么办?
  • Docker 安装全攻略:从入门到上手
  • 快云服务器小助手getdetail存在任意文件文件读取漏洞
  • 普通部署redis伪集群模式
  • 阿里云-将旧服务器数据与配置完全迁移至新服务器
  • 25 - GRACE Mascon数据缺失月份数据插值
  • flask-admin 在modelview 视图中重写on_model_change 与after_model_change
  • Python定义类的属性
  • RTLinux和RTOS基本知识
  • 【Rust自学】7.3. use关键字 Pt.1:use的使用与as关键字
  • Kafka优势
  • yolov6算法及其改进
  • 【ETCD】【实操篇(十四)】etcd 集群备份与还原指南
  • Pandas07
  • 使用 VSCode 学习与实践 LaTeX:从插件安装到排版技巧
  • 基于三种机器学习方法的磁芯损耗预测模型
  • 使用内网穿透工具,为树莓派配置公网地址实现远程ssh
  • springboot maven 构建 建议使用 --release 21 而不是 -source 21 -target 21,因为它会自动设置系统模块的位置
  • 分别查询 user 表中 avatar 和 nickname 列为空的用户数量
  • C# 6.0版本的WebAPI接口部署到Linux服务器