Netty源码—3.Reactor线程模型三
大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
5.NioEventLoop的执行总体框架
(1)Reactor线程所做的三件事情
(2)处理多久IO事件就执行多久任务
(3)NioEventLoop.run()方法的执行流程
(1)Reactor线程所做的三件事情
NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。
一.首先是调用select()方法进行一次事件轮询
由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。
二.然后调用processSelectedKeys()方法处理轮询出来的IO事件
三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
private volatile int ioRatio = 50;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void select(boolean oldWakenUp) throws IOException {
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
//2.轮询过程中发现有任务加入,中断本次轮询
//3.阻塞式select操作: selector.select(timeoutMills)
//4.避免JDK空轮询Bug
}
}
...
}
(2)处理多久IO事件就执行多久任务
在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
private volatile int ioRatio = 50;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
...
}
}
...
}
(3)NioEventLoop.run()方法的执行流程
NioEventLoop.run() -> for(;;)
select() //执行一次事件轮询检查是否有IO事件
processSelectedKeys() //处理产生IO事件的Channel
runAllTasks() //处理异步任务队列
//这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的
6.Reactor线程执行一次事件轮询
(1)执行select操作前设置wakeUp变量
(2)定时任务快开始了则中断本次轮询
(3)轮询中发现有任务加入则中断本次轮询
(4)执行阻塞式select操作
(5)避免JDK的空轮询Bug
(6)执行一次事件轮询的总结
(1)执行select操作前设置wakeUp变量
NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
//Boolean that controls determines if a blocked Selector.select should break out of its selection process.
//In our case we use a timeout for the select method and the select method will block for that time unless waken up.
private final AtomicBoolean wakenUp = new AtomicBoolean();
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
}
}
...
}
如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
//2.轮询过程中发现有任务加入,中断本次轮询
//3.阻塞式select操作: selector.select(timeoutMills)
//4.避免JDK空轮询Bug
}
}
...
}
(2)定时任务快开始了则中断本次轮询
NioEventLoop中的Reactor线程的select操作也是一个for循环。
在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。
Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();//当前时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();//非阻塞执行select操作
selectCnt = 1;
}
break;
}
...
}
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
...
}
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
...
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}
...
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
...
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
public long deadlineNanos() {
return deadlineNanos;
}
...
}
(3)轮询中发现有任务加入则中断本次轮询
注意:Netty的任务队列包括普通任务和定时任务。定时任务快开始时需要中断本次轮询,普通任务队列非空时也需要中断本次轮询。
Netty为了保证普通任务队列里的普通任务能够及时执行,在调用selector.select()方法进行阻塞式select操作前会判断普通任务队列是否为空。如果不为空,那么就调用selector.selectNow()方法执行一次非阻塞select操作,然后跳出循环。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
int selectCnt = 0;
...
for(;;) {
...
//2.轮询过程中发现有任务加入,中断本次轮询
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();//非阻塞式执行select操作
selectCnt = 1;
break;
}
...
}
}
...
}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final Queue<Runnable> tailTasks;
...
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final Queue<Runnable> taskQueue;//普通任务队列
...
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
...
}
(4)执行阻塞式select操作
一.最多阻塞到第一个定时任务的开始时间
二.外部线程提交任务会唤醒Reactor线程
三.是否中断本次轮询的判断条件
一.最多阻塞到第一个定时任务的开始时间
执行到这一步,说明Netty的普通任务队列里的队列为空,并且所有定时任务的开始时间还未到(大于0.5ms)。于是便进行一次阻塞式select操作,一直阻塞到第一个定时任务的开始时间,也就是把timeoutMills作为参数传入select()方法中。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
...
for(;;) {
...
//3.阻塞式select操作: selector.select(timeoutMills),最多阻塞timeoutMills时间
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
...
}
}
...
}
二.外部线程提交任务会唤醒Reactor线程
如果第一个定时任务的延迟时间非常长,比如一小时,那么有可能线程会一直阻塞在select操作(select完还是会返回的)。但只要这段时间内有新任务加入,该阻塞就会被释放。
比如当有外部线程执行NioEventLoop的execute()方法添加任务时,就会调用NioEventLoop的wakeUp()方法来通过selector.wakeup()方法,去唤醒正在执行selector.select(timeoutMills)而被阻塞的线程。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
//Boolean that controls determines if a blocked Selector.select should break out of its selection process.
//In our case we use a timeout for the select method and the select method will block for that time unless waken up.
private final AtomicBoolean wakenUp = new AtomicBoolean();
Selector selector;
...
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
//调用NioEventLoop.wakeup()方法唤醒正在执行selector.select(timeoutMills)而被阻塞的线程
wakeup(inEventLoop);
}
}
...
}
三.是否中断本次轮询的判断条件
阻塞式select操作结束后,Netty又会做一系列状态判断来决定是否中断本次轮询,如果满足如下条件就中断本次轮询:
条件一:检测到IO事件
条件二:被用户主动唤醒
条件三:普通任务队列里有任务需要执行
条件四:第一个定时任务即将要被执行
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
...
for(;;) {
...
//3.阻塞式select操作: selector.select(timeoutMills),最多阻塞timeoutMills时间
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//阻塞式select操作结束后,Netty又会做一系列状态判断来决定是否中断本次轮询
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something, 检测到IO事件
// - waken up by user, 被用户主动唤醒
// - the task queue has a pending task. 普通任务队列里有任务需要执行
// - a scheduled task is ready for processing 第一个定时任务即将要被执行
break;
}
...
}
}
...
}
(5)避免JDK的空轮询Bug
JDK空轮询Bug会导致selector一直空轮询,最终导致CPU的利用率100%。
一.Netty避免JDK空轮询的方法
首先每次执行selector.select(timeoutMillis)之前都会记录开始时间,在阻塞式select操作后记录结束时间。
然后判断阻塞式select操作是否持续了至少timeoutMillis时间。如果阻塞式select操作持续的时间大于等于timeoutMillis,说明这是一次有效的轮询,于是重置selectCnt为1。如果阻塞式select操作持续的时间小于timeoutMillis,则说明可能触发了JDK的空轮询Bug,于是自增selectCnt。当持续时间很短的select操作的次数selectCnt超过了512次,那么就重建Selector。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
intselectCnt = 0;
long currentTimeNanos = System.nanoTime();//记录开始时间
...
for(;;) {
...
int selectedKeys = selector.select(timeoutMillis);//进行阻塞式select操作
selectCnt++;//select操作持续时间很短,可能出现空轮询,selectCnt需要自增
long time = System.nanoTime();//记录结束时间
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
//如果select操作持续的时间大于timeoutMillis,说明这是一次有效的轮询,重置selectCnt为1
selectCnt = 1;
} else if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//如果持续时间很短的select操作的次数超过了512次,就重建selector
rebuildSelector();//重建Selector
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
...
}
}
...
}
二.重建Selector的逻辑
重建Selector的逻辑就是通过openSelector()方法创建一个新的Selector,然后执行一个无限的for循环,只要执行过程中出现一次并发修改SelectionKeys异常,那么就重新开始转移,直到转移完成。
具体的转移步骤为:首先拿到有效的key,然后取消该key在旧Selector上的事件注册。接着将该key对应的Channel注册到新的Selector上,最后重新绑定Channel和新的key。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
//Replaces the current Selector of this event loop with newly created Selectors to work around the infamous epoll 100% CPU bug.
public void rebuildSelector() {
final Selector oldSelector = selector;
final Selector newSelector = openSelector();
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
//1.拿到有效的key
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
//2.取消该key在旧Selector上的事件注册
key.cancel();
//3.将该key对应的Channel注册到新的Selector上
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
//4.重新绑定Channel和新的key
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels++;
}
break;
} catch(ConcurrentModificationException e) {
continue;
}
}
selector = newSelector;
oldSelector.close();
}
...
}
(6)执行一次事件轮询的总结
关于Reactor线程的select操作所做的事情:
简单来说就是:
不断轮询是否有IO事件发生,并且在轮询过程中不断检查是否有任务需要执行,从而保证Netty任务队列中的任务都能够及时执行,以及在轮询过程中会巧妙地使用一个计数器来避开JDK的空轮询Bug。
详细来说就是:
NioEventLoop的select()方法首先会判断有没有定时任务快到要开始的时间了、普通任务队列taskQueue里是否存在任务。如果有就调用selector.selectNow()进行非阻塞式的select操作,如果都没有就调用selector.select(timeoutMillis)进行阻塞式select操作。在阻塞式select操作结束后,会判断这次select操作是否阻塞了timeoutMillis这么长时间。如果没有阻塞那么长时间就表明可能触发了JDK的空轮询Bug,接下来就会继续判断可能触发空轮询Bug的次数是否达到了512次,如果达到了就通过替换原来Selector的方式去避开空轮询Bug。
7.Reactor线程处理产生IO事件的Channel
(1)处理IO事件的关键逻辑
(2)Netty对selectedKeys的优化
(3)处理IO事件的过程说明
(4)处理IO事件的总结
(1)处理IO事件的关键逻辑
Reactor线程执行的第一步是轮询出注册在Selector上的IO事件,第二步便是处理这些IO事件了。
processSelectedKeys()的关键逻辑包含两部分:
一.针对selectedKeys的优化
二.processSelectedKeysOptimized()方法真正处理IO事件
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
...
}
(2)Netty对selectedKeys的优化
Netty对selectedKeys的所有优化都是在NioEventLoop的openSelector()方法中体现的。
这个优化指的是:
Selector.select()操作每次都会把就绪状态的IO事件添加到Selector底层的两个HashSet成员变量中,而Netty会通过反射的方式将Selector中用于存放SelectionKey的HashSet替换成数组,使得添加SelectionKey的时间复杂度由HashSet的O(n)降为数组的O(1)。
具体来说就是:
NioEventLoop的成员变量selectedKeys是一个SelectedSelectionKeySet对象,会在NioEventLoop的openSelector()方法中创建。之后openSelector()方法会通过反射将selectedKeys与Selector的两个成员变量绑定。SelectedSelectionKeySet继承了AbstractSet,但底层是使用数组来存放SelectionKey的。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
...
private Selector openSelector() {
final Selector selector = provider.openSelector();
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
...
//下面的selectorImplClass对应于sun.nio.ch.SelectorImpl
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
//这两个set才是优化中的精华,一句话总结就是:
//用数组来替换Selector中用于存放SelectionKey的HashSet的实现,做到add()方法的时间复杂度为O(1)
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
...
selectedKeys = selectedKeySet;
...
}
...
}
//SelectedSelectionKeySet继承了AbstractSet,说明该类可以当作一个Set来用,但是底层使用两个数组来交替使用
//在add()方法中,首先判断当前应该使用哪个数组,然后找到对应的数组执行如下3个步骤:
//步骤一:将SelectionKey放入数组的尾部
//步骤二:更新该数组的逻辑长度+1
//步骤三:如果该数组的逻辑长度等于该数组的物理长度,就将该数组扩容
//待程序运行一段时间后,等数组的长度足够长,每次轮询到NIO事件的时候,
//调用这里的add()方法只需要O(1)的时间复杂度就能将SelectionKey放入到Set中,
//而JDK底层使用的HashSet的put()方法的时间复杂度最小是O(1)、最大是O(n),
//使用数组替换HashSet还有一个好处就是遍历的时候非常高效
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
SelectedSelectionKeySet() {
keysA = new SelectionKey[1024];
keysB = keysA.clone();
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
//返回一个数组
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;
keysBSize = 0;
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
...
}
//可以看到,SelectorImpl的两个成员变量selectedKeys和keys都是HashSet
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
...
}
(3)处理IO事件的过程说明
说明一:
首先取出IO事件。IO事件是以数组的形式从selectedKeys中取的,其对应的Channel则由SelectionKey的attachment()方法返回。
此时可以体会到优化过的selectedKeys的好处。因为遍历时遍历的是数组,相对JDK原生的HashSet,效率有所提高。
拿到当前的SelectionKey之后,便将selectedKeys[i]设置为null,这样做是为了方便GC。因为假设一个NioEventLoop平均每次轮询出N个IO事件,高峰期轮询出3N个事件,那么selectedKeys的物理长度要大于等于3N。如果每次处理这些key时不设置selectedKeys[i]为null,那么高峰期一过,这些保存在数组尾部的selectedKeys[i]对应的SelectionKey将一直无法被回收,虽然SelectionKey对应的对象可能不大,但其关联的attachment则可能很大。这些对象如果一直存活无法回收,就可能发生内存泄露。
说明二:
然后获取当前SelectionKey对应的attachment。这个attachement就是取出的IO事件对应的Channel了,于是接下来就可以处理该Channel了。
由于Netty在注册服务端Channel时,会将AbstractNioChannel内部的SelectableChannel对象注册到Selector对象上,并且将AbstractNioChannel作为SelectableChannel对象的一个attachment附属。所以当JDK轮询出某个SelectableChannel有IO事件时,就可以通过attachment()方法直接取出AbstractNioChannel进行操作了。
说明三:
接着便会调用processSelectedKey()方法对SelectionKey和AbstractNioChannel进行处理。Netty有两大类Channel:一个是NioServerSocketChannel,由bossGroup处理。另一个是NioSocketChannel,由workerGroup处理。对于boss的NioEventLoop来说,轮询到的是连接事件。对于worker的NioEventLoop来说,轮询到的是读写事件。
说明四:
最后会判断是否再进行一次轮询。NioEventLoop的run()方法每次在轮询到IO事件后,都会将needsToSelectAgain设置为false。只有当Channel从Selector上移除时,也就是调用NioEventLoop的cancel()方法时,发现被取消的key已经达到256次了,才会将needsToSelectAgain设置为true。当needsToSelectAgain为true,就会调用selectAgain()方法再进行一次轮询。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private boolean needsToSelectAgain;
private int cancelledKeys;
private static final int CLEANUP_INTERVAL = 256;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
needsToSelectAgain = false;
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//1.首先取出IO事件
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;//Help GC
//2.然后获取对应的Channel和处理该Channel
//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//网络事件的处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3.最后判断是否应该再进行一次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
//selectedKeys.flip()会返回一个数组
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
//If the channel implementation throws an exception because there is no event loop,
//we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch.
return;
}
//Only close ch if ch is still registerd to this EventLoop.
//ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process,
//but the channel is still healthy and should not be closed.
if (eventLoop != this || eventLoop == null) {
return;
}
//close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
//the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
//Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
...
}
(4)处理IO事件的总结
Netty默认情况下会通过反射,将Selector底层用于存放SelectionKey的两个HashSet,转化成一个数组来提升处理IO事件的效率。
在处理每一个SelectionKey时都会拿到对应的一个attachment,而这个attachment就是在服务端Channel注册Selector时所绑定的一个AbstractNioChannel。所以在处理每一个SelectionKey时,都可以找到对应的AbstractNioChannel,然后通过Pipeline将处理串行到ChannelHandler,回调到用户的方法。