Netty源码—3.Reactor线程模型四
大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
8.Reactor线程处理任务队列之添加任务
(1)Reactor线程执行一次事件轮询的过程
(2)任务的分类和添加说明
(3)普通任务的添加
(4)定时任务的添加
(5)Netty的定时任务机制补充
(1)Reactor线程执行一次事件轮询的过程
Reactor线程通过NioEventLoop的run()方法每进行一次事件轮询,首先会调用select()方法尝试检测出IO事件,然后会调用processSelectedKeys()方法处理检测出的IO事件。其中IO事件主要包括新连接接入事件和连接的数据读写事件,最后会调用runAllTasks()方法处理任务队列中的异步任务。
(2)任务的分类和添加说明
runAllTasks()方法中的Task包括普通任务和定时任务,分别存放于NioEventLoop不同的队列里。一个是普通的任务队列MpscQueue,另一个是定时的任务队列PriorityQueue。
普通的任务队列MpscQueue在创建NioEventLoop时创建的,然后在外部线程调用NioEventLoop的execute()方法时,会调用addTask()方法将Task保存到普通的任务队列里。
定时的任务队列PriorityQueue则是在添加定时任务时创建的,然后在外部线程调用NioEventLoop的schedule()方法时,会调用scheduleTaskQueue().add()方法将Task保存到定时的任务队列里。
(3)普通任务的添加
场景一:用户自定义普通任务
场景二:外部线程调用Channel的方法
当通过ctx.channel().eventLoop().execute(...)自定义普通任务,或者通过非Reactor线程(外部线程)调用Channel的各类方法时,最后都会执行到SingleThreadEventExecutor的execute()方法。
场景一:用户自定义普通任务
不管是外部线程还是Reactor线程执行NioEventLoop的execute()方法,都会调用NioEventLoop的addTask()方法,然后调用offerTask()方法。而offerTask()方法会使用一个taskQueue将Task保存起来。这个taskQueue其实就是一个MPSC队列,每一个NioEventLoop都会有一个MPSC队列。
Netty使用MPSC队列可以方便地将外部线程的异步任务进行聚集,然后在Reactor线程内部用单线程来批量执行以提升性能。可以借鉴Netty的这种任务执行模式来处理类似多线程数据聚合,定时上报应用。
//场景一:用户自定义普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
...
}
});
public final class NioEventLoop extends SingleThreadEventLoop {
...
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//每一个NioEventLoop会有一个MPSC队列
private final Queue<Runnable> taskQueue;
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//创建普通的任务队列MpscQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
...
@Override
public void execute(Runnable task) {
if (task == null) throw new NullPointerException("task");
boolean inEventLoop = inEventLoop();
//不管是外部线程还是Reactor线程执行NioEventLoop的execute()方法,都会调用NioEventLoop的addTask()方法
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
}
//Add a task to the task queue, or throws a RejectedExecutionException if this instance was shutdown before.
protected void addTask(Runnable task) {
if (task == null) throw new NullPointerException("task");
if (!offerTask(task)) reject(task);
}
final boolean offerTask(Runnable task) {
if (isShutdown()) reject();
return taskQueue.offer(task);
}
...
}
场景二:外部线程调用Channel的方法
这个场景是在业务线程里,根据用户标识找到对应的Channel,然后调用Channel的write()方法向该用户推送消息。
外部线程在调用Channel的write()方法时,executor.inEventLoop()会返回false。于是会将write操作封装成一个WriteTask,然后调用safeExecute()方法来执行。默认情况下会获取Channel对应的NIO线程,然后作为参数传入safeExecute()方法中进行执行,从而确保任务会由Channel对应的NIO线程执行,通过单线程执行来实现线程安全。
//场景二:当前线程为业务线程
channel.write(...)
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
final EventExecutor executor;//一般初始化时默认为null
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.pipeline = pipeline;
this.executor = executor;
...
}
...
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
//外部线程在调用Channel的write()方法时,executor.inEventLoop()会返回false
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//将write操作封装成一个WriteTask
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
//调用safeExecute()来执行
safeExecute(executor, task, promise, m);
}
}
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
//调用SingleThreadEventExecutor.execute()方法
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
...
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
//传入的executor为null
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
//传入的executor为null
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
...
}
...
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//每一个NioEventLoop会有一个MPSC队列
private final Queue<Runnable> taskQueue;
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//创建普通的任务队列MpscQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
...
@Override
public void execute(Runnable task) {
if (task == null) throw new NullPointerException("task");
boolean inEventLoop = inEventLoop();
//不管是外部线程还是Reactor线程执行NioEventLoop的execute()方法,都会调用NioEventLoop的addTask()方法
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
}
//Add a task to the task queue, or throws a RejectedExecutionException if this instance was shutdown before.
protected void addTask(Runnable task) {
if (task == null) throw new NullPointerException("task");
if (!offerTask(task)) reject(task);
}
final boolean offerTask(Runnable task) {
if (isShutdown()) reject();
return taskQueue.offer(task);
}
...
}
(4)定时任务的添加
通常使用ctx.channel().eventLoop().schedule(..)自定义定时任务,其中schedule()方法会通过scheduledTaskQueue().add(task)来添加定时任务。首先scheduledTaskQueue()方法会返回一个优先级队列,然后通过该优先级队列的add()方法将定时任务对象加入到队列中。
注意,这里可以直接使用优先级队列而不用考虑多线程并发问题的原因如下。如果是外部线程调用schedule()方法添加定时任务,那么Netty会将添加定时任务这个逻辑封装成一个普通的Task。这个Task的任务是一个"添加某定时任务"的任务,而不是添加某定时任务。这样,对优先级队列的访问就变成单线程了,也就是只有Reactor线程会访问,从而不存在多线程并发问题。
//场景三:用户自定义定时任务,这也是用得最多的方法
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
...
}
}, 60, TimeUnit.SECONDS);
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
...
//添加定时任务
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
//如果当前线程是Reactor线程,则直接往PriorityQueue中添加任务
scheduledTaskQueue().add(task);
} else {
//如果是外部线程,则调用SingleThreadEventExecutor.execute()方法
//将添加定时任务这一动作也封装成一个普通任务
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
//创建定时的任务队列PriorityQueue
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
...
}
(5)Netty的定时任务机制补充
一.如何保证截止时间最近的任务优先执行
为什么定时任务要保存在优先级队列中?优先级队列的特性是会按照一定的顺序来排列内部元素,内部元素是可以比较的。由于优先级队列中的每个元素都是定时任务,所以定时任务也是可以比较的。比较的逻辑就是:先比较定时任务的截止时间,在截止时间相同的情况下再比较定时任务的添加顺序也就是ID。
二.Netty的定时任务有三种执行方式
方式一:定时任务不会被重复执行
ctx.channel().eventLoop().schedule(),传递的periodNanos为0。
方式二:每隔一段时间执行一次
ctx.channel().eventLoop().scheduleAtFixedRate(),传递的periodNanos为正数。
方式三:隔相同时间再执行一次
ctx.channel().eventLoop().scheduleWithFixedDelay(),传递的periodNanos为负数。
Netty的3种定时任务的执行逻辑是通过调整下一次任务的截止时间来运行的。首先修改完下一次执行的截止时间,然后把当前任务再次加入队列,这样就能确保任务在适当的时候执行。
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
//每个定时任务都有一个唯一的ID
private static final AtomicLong nextTaskId = new AtomicLong();
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
//标识一个任务是否重复执行,以及以何种方式执行
private final long periodNanos;
...
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
public long deadlineNanos() {
return deadlineNanos;
}
@Override
public void run() {
...
if (periodNanos == 0) {
//1.对应schedule()方法,表示一次性任务
V result = task.call();
setSuccessInternal(result);
} else {
task.call();
long p = periodNanos;
if (p > 0) {
//2.对应scheduleAtFixedRate()方法,表示以固定速率执行任务
deadlineNanos += p;
} else {
//3.对应scheduleWithFixedDelay()方法,表示以固定的延时执行任务
deadlineNanos = nanoTime() - p;
}
scheduledTaskQueue.add(this);
}
...
}
...
}
9.Reactor线程处理任务队列之执行任务
(1)runAllTasks()方法需要传入超时时间
(2)Reactor线程执行任务的步骤
(3)Netty性能优化之批量策略
(4)NioEventLoop.run()方法执行任务总结
(1)runAllTasks()方法需要传入超时时间
SingleThreadEventExecutor的runAllTasks()方法需要传入参数timeoutNanos,表示尽量在timeoutNanos时间内将所有的任务都取出来执行一遍。因为如果Reactor线程在执行任务时停留的时间过长,那么将会累积许多IO事件无法及时处理,从而导致大量客户端请求阻塞。因此Netty会精细控制内部任务队列的执行时间。
(2)Reactor线程执行任务的步骤
一.任务聚合
转移定时任务到MPSC队列,这里只是将快到期的定时任务转移到MPSC队列里。
二.时间计算
计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC队列)里了。
三.任务执行
首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔64次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//每一个NioEventLoop会有一个MPSC队列
private final Queue<Runnable> taskQueue;
...
//Poll all tasks from the task queue and run them via Runnable#run() method.
//This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos.
protected boolean runAllTasks(long timeoutNanos) {
//1.转移定时任务到MPSC队列,也就是任务聚合
fetchFromScheduledTaskQueue();
//从普通的任务队列(MPSC队列)里获取任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
//2.计算本轮任务执行的截止时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//3.执行任务,通过for循环逐个执行pollTask()取出的任务
for (;;) {
//3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行
safeExecute(task);
//3.2 累加当前已执行的任务数
runTasks ++;
//3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//3.4 判断本轮任务是否已经执行完毕
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
...
}
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
...
//Return the Runnable which is ready to be executed with the given nanoTime.
//You should use #nanoTime() to retrieve the the correct nanoTime.
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
...
}
//Abstract base class for EventExecutor implementations.
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
...
//Try to execute the given Runnable and just log if it throws a Throwable.
protected static void safeExecute(Runnable task) {
try {
task.run();//同步执行任务
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
...
}
(3)Netty性能优化之间隔策略
假设任务队列里有海量的小任务,如果每次执行完任务都需要判断是否到截止时间,那么效率是比较低的。所以Netty选择通过每隔64个任务才判断一下是否到截止时间,那么效率就会高很多。
(4)NioEventLoop.run()方法执行任务总结
Netty里的任务分两种:一种是普通的任务,另一种是定时的任务。Netty在执行这些任务时首先会把定时任务聚合到普通任务队列里,然后再从普通任务队列里获取任务逐个执行,并且是每执行64个任务之后才判断一下当前时间是否超过最大允许执行时间。如果超过就直接中断,中断之后就会进行下一次NioEventLoop.run()方法的for循环。
10.NioEventLoop总结
(1)NioEventLoop的执行流程总结
(2)Reactor线程模型总结
(3)NioEventLoop创建启动执行的总结
(1)NioEventLoop的执行流程总结
一.NioEventLoop在执行过程中首先会不断检测是否有IO事件发生,然后如果检测出有IO事件就处理IO事件,接着处理完IO事件之后再处理外部线程提交过来的异步任务。
二.在检测是否有IO事件发生时,为了保证异步任务的及时处理,只要有任务要处理,那么就立即停止检测去处理任务。
三.外部线程异步执行的任务分为两种:普通任务和定时任务。这两种任务分别保存到MPSC队列和优先级队列,而优先级队列中的任务最终都会转移到MPSC队列里进行处理。
四.Netty每处理完64个任务才会检查一次是否超时而退出执行任务的循环。
(2)Reactor线程模型总结
一.NioEventLoopGroup在用户代码中被创建,默认情况下会创建两倍CPU核数个NioEventLoop。
二.NioEventLoop是懒启动的,bossNioEventLoop在服务端启动时启动,workerNioEventLoop在新连接接入时启动。
三.当CPU核数为2的幂时,为每一个新连接绑定NioEventLoop之后,都会做一个取模运算转位与运算的优化。
四.每个连接都对应一个Channel,每个Channel都绑定唯一一个NioEventLoop,一个NioEventLoop可能会被绑定多个Channel,每个NioEventLoop都对应一个FastThreadLocalThread线程实体和一个Selector。因此单个连接的所有操作都是在一个线程上执行的,所以是线程安全的。
五.每个NioEventLoop都对应一个Selector,这个Selector可以批量处理注册到它上面的Channel。
六.每个NioEventLoop的执行过程都包括事件检测、事件处理以及异步任务的执行。
七.用户线程池在对Channel进行一些操作时均为线程安全的。这是因为Netty会把外部线程的操作都封装成一个Task放入这个Channel绑定的NioEventLoop中的MPSC队列,然后在该NioEventLoop的执行过程(事件循环)的第三个过程中进行串行执行。
八.所以NioEventLoop的职责不仅仅是处理网络IO事件,用户自定义的普通任务和定时任务也会统一由NioEventLoop处理,从而实现线程模型的统一。
九.从调度层看,也不存在从NioEventLoop线程中再启动其他类型的线程用于异步执行另外的任务,从而避免了多线程并发操作和锁竞争,提升了IO线程的处理性能和调度性能。
(3)NioEventLoop创建启动执行的总结
一.用户在创建bossGroup和workerGroup时,NioEventLoopGroup被创建,默认不传参时会创建两倍CPU核数个NioEventLoop。
二.每个NioEventLoopGroup都有一个线程执行器executor和一个线程选择器chooser。线程选择器chooser用于进行线程分配,它会针对NioEventLoop的个数进行优化。
三.NioEventLoop在创建时会创建一个Selector和一个MPSC任务队列,创建Selector时Netty会通过反射的方式用数组去替换Selector里的两个HashSet数据结构。
四.Netty的NioEventLoop在首次调用execute()方法时会启动线程,这个线程是一个FastThreadLocalThread对象。启动线程后,Netty会将创建完成的线程保存到成员变量,以便能判断执行NioEventLoop里的逻辑的线程是否是这个创建好的线程。
五.NioEventLoop的执行逻辑在run()方法里,主要包括3部分:第一是检测IO事件,第二是处理IO事件,第三是执行异步任务。