Netty03-核心组件NioEventLoopGroup解读
NioEventLoopGroup
可以看到NioEventLoopGroup继承了MultithreadEventExecutorGroup并且实现了EventLoopGroup接口,而这两个类被ExecutorService修饰,所以NioEventLoopGroup实际上是一个线程池,池中的对象其实就是单个的NioEventLoop。
源码解读
NioEventLoopGroup 的参数初始化
// 进入无参构造
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
// group所包含的executor
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// 注入单例模式的提供者
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 注入默认的策略工厂实例
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) { // 注入拒绝处理器
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//默认线程数是cpu核数的两倍
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
构造真正的NioEventGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//1、
//executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接
// 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
//NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//2、
//这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现 线程池;
//数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//3、
//newChild(executor, args) 函数在NioEventLoopGroup类中实现了,
// 实质就是就是存入了一个 NIOEventLoop类实例
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//4、实例化线程工厂执行器选择器: 根据children获取选择器
chooser = chooserFactory.newChooser(children);
//5、为每个EventLoop线程添加 线程终止监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//6、将children 添加到对应的set集合中去重, 表示只可读。
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
children[i] = newChild(executor, args);
/**
* newChild(executor, args) 里的方法
* 我们可以看到 返回的就是一个 NioEventLoop
*/
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
1. NioEventLoopGroup初始化时未指定线程数,那么会使用默认线程数,即 `线程数 = CPU核心数 * 2`;
2. 每个NioEventLoopGroup对象内部都有一组可执行的`NioEventLoop数组`,其大小是 nThreads, 这样就构成了一个线程池, `一个NIOEventLoop可以理解成就是一个线程`。
3. 所有的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个
NIOEventLoopGroup的。这一点从 newChild(executor, args); 方法就可以看出:newChild()的实现是在NIOEventLoopGroup中实现的。
4. 当有IO事件来时,需要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的,并调用该类的next()方法。
5. 每个NioEventLoopGroup对象都有一个NioEventLoop选择器与之对应,其会根据NioEventLoop的个数,动态选择chooser(如果是2的幂次方,则按位运算,否则使用普通的轮询)