当前位置: 首页 > article >正文

Netty源码—3.Reactor线程模型三

大纲

5.NioEventLoop的执行总体框架

6.Reactor线程执行一次事件轮询

7.Reactor线程处理产生IO事件的Channel

8.Reactor线程处理任务队列之添加任务

9.Reactor线程处理任务队列之执行任务

10.NioEventLoop总结

5.NioEventLoop的执行总体框架

(1)Reactor线程所做的三件事情

(2)处理多久IO事件就执行多久任务

(3)NioEventLoop.run()方法的执行流程

(1)Reactor线程所做的三件事情

NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。

一.首先是调用select()方法进行一次事件轮询

由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。

二.然后调用processSelectedKeys()方法处理轮询出来的IO事件

三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    private volatile int ioRatio = 50;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }

    private void select(boolean oldWakenUp) throws IOException {
        for(;;) {
            //1.定时任务截止时间快到了,中断本次轮询
            //2.轮询过程中发现有任务加入,中断本次轮询
            //3.阻塞式select操作: selector.select(timeoutMills)
            //4.避免JDK空轮询Bug
        }
    }
    ...
}

(2)处理多久IO事件就执行多久任务

在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    private volatile int ioRatio = 50;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
            ...
        }
    }
    ...
}

(3)NioEventLoop.run()方法的执行流程

NioEventLoop.run() -> for(;;)
  select() //执行一次事件轮询检查是否有IO事件
  processSelectedKeys() //处理产生IO事件的Channel
  runAllTasks() //处理异步任务队列
//这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的

6.Reactor线程执行一次事件轮询

(1)执行select操作前设置wakeUp变量

(2)定时任务快开始了则中断本次轮询

(3)轮询中发现有任务加入则中断本次轮询

(4)执行阻塞式select操作

(5)避免JDK的空轮询Bug

(6)执行一次事件轮询的总结

(1)执行select操作前设置wakeUp变量

NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    //Boolean that controls determines if a blocked Selector.select should break out of its selection process. 
    //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
        }
    }
    ...
}

如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        for(;;) {
            //1.定时任务截止时间快到了,中断本次轮询
            //2.轮询过程中发现有任务加入,中断本次轮询
            //3.阻塞式select操作: selector.select(timeoutMills)
            //4.避免JDK空轮询Bug
        }
    }
    ...
}

(2)定时任务快开始了则中断本次轮询

NioEventLoop中的Reactor线程的select操作也是一个for循环。

在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。

Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();//当前时间
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
        for(;;) {
            //1.定时任务截止时间快到了,中断本次轮询
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();//非阻塞执行select操作
                    selectCnt = 1;
                }
                break;
            }
            ...
        }
    }
    ...
}

//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    ...
    protected long delayNanos(long currentTimeNanos) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            return SCHEDULE_PURGE_INTERVAL;
        }
        return scheduledTask.delayNanos(currentTimeNanos);
    }
    ...
}

//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
    ...
    final ScheduledFutureTask<?> peekScheduledTask() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        if (scheduledTaskQueue == null) {
            return null;
        }
        return scheduledTaskQueue.peek();
    }
    ...
}

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
    ...
    public long delayNanos(long currentTimeNanos) {
        return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
    }
    public long deadlineNanos() {
        return deadlineNanos;
    }
    ...
}

(3)轮询中发现有任务加入则中断本次轮询

注意:Netty的任务队列包括普通任务和定时任务。定时任务快开始时需要中断本次轮询,普通任务队列非空时也需要中断本次轮询。

Netty为了保证普通任务队列里的普通任务能够及时执行,在调用selector.select()方法进行阻塞式select操作前会判断普通任务队列是否为空。如果不为空,那么就调用selector.selectNow()方法执行一次非阻塞select操作,然后跳出循环。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        int selectCnt = 0;
        ...
        for(;;) {
            ...
            //2.轮询过程中发现有任务加入,中断本次轮询
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();//非阻塞式执行select操作
                selectCnt = 1;
                break;
            }
            ...
        }
    }
    ...
}

//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    private final Queue<Runnable> tailTasks;
    ...
    @Override
    protected boolean hasTasks() {
        return super.hasTasks() || !tailTasks.isEmpty();
    }
    ...
}

//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    private final Queue<Runnable> taskQueue;//普通任务队列
    ...
    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty();
    }
    ...
}

(4)执行阻塞式select操作

一.最多阻塞到第一个定时任务的开始时间

二.外部线程提交任务会唤醒Reactor线程

三.是否中断本次轮询的判断条件

一.最多阻塞到第一个定时任务的开始时间

执行到这一步,说明Netty的普通任务队列里的队列为空,并且所有定时任务的开始时间还未到(大于0.5ms)。于是便进行一次阻塞式select操作,一直阻塞到第一个定时任务的开始时间,也就是把timeoutMills作为参数传入select()方法中。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        ...
        for(;;) {
            ...
            //3.阻塞式select操作: selector.select(timeoutMills),最多阻塞timeoutMills时间
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            ...
        }
    }
    ...
}

二.外部线程提交任务会唤醒Reactor线程

如果第一个定时任务的延迟时间非常长,比如一小时,那么有可能线程会一直阻塞在select操作(select完还是会返回的)。但只要这段时间内有新任务加入,该阻塞就会被释放。

比如当有外部线程执行NioEventLoop的execute()方法添加任务时,就会调用NioEventLoop的wakeUp()方法来通过selector.wakeup()方法,去唤醒正在执行selector.select(timeoutMills)而被阻塞的线程。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    //Boolean that controls determines if a blocked Selector.select should break out of its selection process. 
    //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    Selector selector;
    ...
    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }
}

//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    ...
}

//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    ...
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            //调用NioEventLoop.wakeup()方法唤醒正在执行selector.select(timeoutMills)而被阻塞的线程
            wakeup(inEventLoop);
        }
    }
    ...
}

三.是否中断本次轮询的判断条件

阻塞式select操作结束后,Netty又会做一系列状态判断来决定是否中断本次轮询,如果满足如下条件就中断本次轮询:

条件一:检测到IO事件

条件二:被用户主动唤醒

条件三:普通任务队列里有任务需要执行

条件四:第一个定时任务即将要被执行

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        ...
        for(;;) {
            ...
            //3.阻塞式select操作: selector.select(timeoutMills),最多阻塞timeoutMills时间
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            //阻塞式select操作结束后,Netty又会做一系列状态判断来决定是否中断本次轮询
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something, 检测到IO事件
                // - waken up by user, 被用户主动唤醒
                // - the task queue has a pending task. 普通任务队列里有任务需要执行
                // - a scheduled task is ready for processing 第一个定时任务即将要被执行
                break;
            }
            ...
        }
    }
    ...
}

(5)避免JDK的空轮询Bug

JDK空轮询Bug会导致selector一直空轮询,最终导致CPU的利用率100%。

一.Netty避免JDK空轮询的方法

首先每次执行selector.select(timeoutMillis)之前都会记录开始时间,在阻塞式select操作后记录结束时间。

然后判断阻塞式select操作是否持续了至少timeoutMillis时间。如果阻塞式select操作持续的时间大于等于timeoutMillis,说明这是一次有效的轮询,于是重置selectCnt为1。如果阻塞式select操作持续的时间小于timeoutMillis,则说明可能触发了JDK的空轮询Bug,于是自增selectCnt。当持续时间很短的select操作的次数selectCnt超过了512次,那么就重建Selector。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
    ...
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        intselectCnt = 0;
        long currentTimeNanos = System.nanoTime();//记录开始时间
        ...
        for(;;) {
            ...
            int selectedKeys = selector.select(timeoutMillis);//进行阻塞式select操作
            selectCnt++;//select操作持续时间很短,可能出现空轮询,selectCnt需要自增
            long time = System.nanoTime();//记录结束时间
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                //如果select操作持续的时间大于timeoutMillis,说明这是一次有效的轮询,重置selectCnt为1
                selectCnt = 1;
            } else if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                //如果持续时间很短的select操作的次数超过了512次,就重建selector
                rebuildSelector();//重建Selector
                selector = this.selector;
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
            ...
        }
    }
    ...
}

二.重建Selector的逻辑

重建Selector的逻辑就是通过openSelector()方法创建一个新的Selector,然后执行一个无限的for循环,只要执行过程中出现一次并发修改SelectionKeys异常,那么就重新开始转移,直到转移完成。

具体的转移步骤为:首先拿到有效的key,然后取消该key在旧Selector上的事件注册。接着将该key对应的Channel注册到新的Selector上,最后重新绑定Channel和新的key。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    //Replaces the current Selector of this event loop with newly created Selectors to work around the infamous epoll 100% CPU bug.
    public void rebuildSelector() {
        final Selector oldSelector = selector;
        final Selector newSelector = openSelector();
        int nChannels = 0;
        for (;;) {
            try {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    //1.拿到有效的key
                    if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                        continue;
                    }
        
                    int interestOps = key.interestOps();
                    //2.取消该key在旧Selector上的事件注册
                    key.cancel();
                    //3.将该key对应的Channel注册到新的Selector上
                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) {
                        //4.重新绑定Channel和新的key
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels++;
                }
                break;
            } catch(ConcurrentModificationException e) {
                continue;
            }
        }
        selector = newSelector;
        oldSelector.close();
    }
    ...
}

(6)执行一次事件轮询的总结

关于Reactor线程的select操作所做的事情:

简单来说就是:

不断轮询是否有IO事件发生,并且在轮询过程中不断检查是否有任务需要执行,从而保证Netty任务队列中的任务都能够及时执行,以及在轮询过程中会巧妙地使用一个计数器来避开JDK的空轮询Bug。

详细来说就是:

NioEventLoop的select()方法首先会判断有没有定时任务快到要开始的时间了、普通任务队列taskQueue里是否存在任务。如果有就调用selector.selectNow()进行非阻塞式的select操作,如果都没有就调用selector.select(timeoutMillis)进行阻塞式select操作。在阻塞式select操作结束后,会判断这次select操作是否阻塞了timeoutMillis这么长时间。如果没有阻塞那么长时间就表明可能触发了JDK的空轮询Bug,接下来就会继续判断可能触发空轮询Bug的次数是否达到了512次,如果达到了就通过替换原来Selector的方式去避开空轮询Bug。

7.Reactor线程处理产生IO事件的Channel

(1)处理IO事件的关键逻辑

(2)Netty对selectedKeys的优化

(3)处理IO事件的过程说明

(4)处理IO事件的总结

(1)处理IO事件的关键逻辑

Reactor线程执行的第一步是轮询出注册在Selector上的IO事件,第二步便是处理这些IO事件了。

processSelectedKeys()的关键逻辑包含两部分:

一.针对selectedKeys的优化

二.processSelectedKeysOptimized()方法真正处理IO事件

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //selectedKeys.flip()会返回一个数组
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    ...
}

(2)Netty对selectedKeys的优化

Netty对selectedKeys的所有优化都是在NioEventLoop的openSelector()方法中体现的。

这个优化指的是:

Selector.select()操作每次都会把就绪状态的IO事件添加到Selector底层的两个HashSet成员变量中,而Netty会通过反射的方式将Selector中用于存放SelectionKey的HashSet替换成数组,使得添加SelectionKey的时间复杂度由HashSet的O(n)降为数组的O(1)。

具体来说就是:

NioEventLoop的成员变量selectedKeys是一个SelectedSelectionKeySet对象,会在NioEventLoop的openSelector()方法中创建。之后openSelector()方法会通过反射将selectedKeys与Selector的两个成员变量绑定。SelectedSelectionKeySet继承了AbstractSet,但底层是使用数组来存放SelectionKey的。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    ...
    private Selector openSelector() {
        final Selector selector = provider.openSelector();
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        ...
        //下面的selectorImplClass对应于sun.nio.ch.SelectorImpl
        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
        selectedKeysField.setAccessible(true);
        publicSelectedKeysField.setAccessible(true);
        //这两个set才是优化中的精华,一句话总结就是:
        //用数组来替换Selector中用于存放SelectionKey的HashSet的实现,做到add()方法的时间复杂度为O(1)
        selectedKeysField.set(selector, selectedKeySet);
        publicSelectedKeysField.set(selector, selectedKeySet);
        ...
        selectedKeys = selectedKeySet;
        ...
    }
    ...
}

//SelectedSelectionKeySet继承了AbstractSet,说明该类可以当作一个Set来用,但是底层使用两个数组来交替使用
//在add()方法中,首先判断当前应该使用哪个数组,然后找到对应的数组执行如下3个步骤:
//步骤一:将SelectionKey放入数组的尾部
//步骤二:更新该数组的逻辑长度+1
//步骤三:如果该数组的逻辑长度等于该数组的物理长度,就将该数组扩容

//待程序运行一段时间后,等数组的长度足够长,每次轮询到NIO事件的时候,
//调用这里的add()方法只需要O(1)的时间复杂度就能将SelectionKey放入到Set中,
//而JDK底层使用的HashSet的put()方法的时间复杂度最小是O(1)、最大是O(n),
//使用数组替换HashSet还有一个好处就是遍历的时候非常高效
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }
        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if (size == keysA.length) {
                doubleCapacityA();
            }
        } else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if (size == keysB.length) {
                doubleCapacityB();
            }
        }
        return true;
    }
    
    //返回一个数组
    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            return keysB;
        }
    }
    ...
}

//可以看到,SelectorImpl的两个成员变量selectedKeys和keys都是HashSet
public abstract class SelectorImpl extends AbstractSelector {
    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    private Set<SelectionKey> publicKeys;
    private Set<SelectionKey> publicSelectedKeys;

    protected SelectorImpl(SelectorProvider var1) {
        super(var1);
        if (Util.atBugLevel("1.4")) {
            this.publicKeys = this.keys;
            this.publicSelectedKeys = this.selectedKeys;
        } else {
            this.publicKeys = Collections.unmodifiableSet(this.keys);
            this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
        }
    }
    ...
}

(3)处理IO事件的过程说明

说明一:

首先取出IO事件。IO事件是以数组的形式从selectedKeys中取的,其对应的Channel则由SelectionKey的attachment()方法返回。

此时可以体会到优化过的selectedKeys的好处。因为遍历时遍历的是数组,相对JDK原生的HashSet,效率有所提高。

拿到当前的SelectionKey之后,便将selectedKeys[i]设置为null,这样做是为了方便GC。因为假设一个NioEventLoop平均每次轮询出N个IO事件,高峰期轮询出3N个事件,那么selectedKeys的物理长度要大于等于3N。如果每次处理这些key时不设置selectedKeys[i]为null,那么高峰期一过,这些保存在数组尾部的selectedKeys[i]对应的SelectionKey将一直无法被回收,虽然SelectionKey对应的对象可能不大,但其关联的attachment则可能很大。这些对象如果一直存活无法回收,就可能发生内存泄露。

说明二:

然后获取当前SelectionKey对应的attachment。这个attachement就是取出的IO事件对应的Channel了,于是接下来就可以处理该Channel了。

由于Netty在注册服务端Channel时,会将AbstractNioChannel内部的SelectableChannel对象注册到Selector对象上,并且将AbstractNioChannel作为SelectableChannel对象的一个attachment附属。所以当JDK轮询出某个SelectableChannel有IO事件时,就可以通过attachment()方法直接取出AbstractNioChannel进行操作了。

说明三:

接着便会调用processSelectedKey()方法对SelectionKey和AbstractNioChannel进行处理。Netty有两大类Channel:一个是NioServerSocketChannel,由bossGroup处理。另一个是NioSocketChannel,由workerGroup处理。对于boss的NioEventLoop来说,轮询到的是连接事件。对于worker的NioEventLoop来说,轮询到的是读写事件。

说明四:

最后会判断是否再进行一次轮询。NioEventLoop的run()方法每次在轮询到IO事件后,都会将needsToSelectAgain设置为false。只有当Channel从Selector上移除时,也就是调用NioEventLoop的cancel()方法时,发现被取消的key已经达到256次了,才会将needsToSelectAgain设置为true。当needsToSelectAgain为true,就会调用selectAgain()方法再进行一次轮询。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    private boolean needsToSelectAgain;
    private int cancelledKeys;
    private static final int CLEANUP_INTERVAL = 256;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            needsToSelectAgain = false;
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //selectedKeys.flip()会返回一个数组
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            //1.首先取出IO事件
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            selectedKeys[i] = null;//Help GC
            //2.然后获取对应的Channel和处理该Channel
            //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                //网络事件的处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            //3.最后判断是否应该再进行一次轮询
            if (needsToSelectAgain) {
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }
                selectAgain();
                //selectedKeys.flip()会返回一个数组
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                //If the channel implementation throws an exception because there is no event loop, 
                //we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch.
                return;
            }
            //Only close ch if ch is still registerd to this EventLoop. 
            //ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process, 
            //but the channel is still healthy and should not be closed.
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            //close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            //the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }

            //Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            //Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    //Connection already closed - no need to handle write.
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }
    ...
}

(4)处理IO事件的总结

Netty默认情况下会通过反射,将Selector底层用于存放SelectionKey的两个HashSet,转化成一个数组来提升处理IO事件的效率。

在处理每一个SelectionKey时都会拿到对应的一个attachment,而这个attachment就是在服务端Channel注册Selector时所绑定的一个AbstractNioChannel。所以在处理每一个SelectionKey时,都可以找到对应的AbstractNioChannel,然后通过Pipeline将处理串行到ChannelHandler,回调到用户的方法。


http://www.kler.cn/a/595471.html

相关文章:

  • L2TP实验报告
  • 无服务器架构将淘汰运维?2025年云计算形态预测
  • RabbitMQ 与 Kafka:消息中间件的终极对比与选型指南
  • MSE分类时梯度消失的问题详解和交叉熵损失的梯度推导
  • Redis哨兵模式(Sentinel)高可用方案介绍与配置实践
  • 数字孪生技术引领UI前端设计新风尚:跨平台与响应式设计的结合
  • 【Bluebell】项目总结:基于 golang 的前后端分离 web 项目实战
  • ue5蓝图项目转换为c++项目 遇到的问题
  • VectorBT:Python量化交易策略开发与回测评估详解
  • 如何缓解大语言模型推理中的“幻觉”(Hallucination)?
  • deepSpeed多机多卡训练服务器之间,和服务器内两个GPU是怎么通信
  • 识别并脱敏上传到deepseek/chatgpt的文本文件中的身份证/手机号
  • 单片机自学总结
  • 架构设计之自定义延迟双删缓存注解(上)
  • 【C++基础】Lambda 函数 基础知识讲解学习及难点解析
  • vscode连接本地mysql数据库
  • 解决python配置文件类configparser.ConfigParser,插入、读取数据,自动转为小写的问题
  • LLM之向量数据库Chroma milvus FAISS
  • SOFAStack-00-sofa 技术栈概览
  • ip2region与express最佳实践