【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程
系列文章目录
【zookeeper核心源码解析】第一课:zk启动类核心流程序列图
【zookeeper核心源码解析】第二课:俯瞰QuorumPeer启动核心流程,实现选举关键流程
【zookeeper核心源码解析】第三课:leader与follower何时开始同步,如何同步数据
【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程
【zookeeper核心源码解析】第四课:客户端与服务端读写的io核心流程
- 系列文章目录
- 1. 先看服务端初始化与连接构建的准备
- 2. 客户端代码
1. 先看服务端初始化与连接构建的准备
在第一节中,介绍到NIOServerCnxnFactory的初始化,该类其实就是专门为客户端读写数据准备的服务端。主要构建连接与数据读写。
c class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
在run方法中构建连接与io读写,具体代码如下:
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
2. 客户端代码
ClientCnxn 类是客户端的入口代码。
/**
* This class manages the socket i/o for the client. ClientCnxn maintains a list
* of available servers to connect to and "transparently" switches servers it is
* connected to as needed.
*
*/
里面的EventThread专本对数据进行异步读写。感兴趣可以从run()方法进去看
@Override
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down");
}