[Netty] Selector选择器以及Reactor实现 (七)
文章目录
- 1.Nio中的Selector介绍
- 1.1 Selector
- 1.2 SelectionKey
- 1.3 ServerSocketChannel
- 1.4 SocketChannel
- 2.Netty中NioEventLoop的选择器
- 3.Netty对Reactor的实现
1.Nio中的Selector介绍
通过Selector多路复用器实现IO的多路复用, Selector可以监听多个连接的Channel事件, 同事可以不断的查询已注册Channel是否处于就绪状态, 实现一个线程高效管理多个Channel
- Selector能够同时检测对个注册的通道上是否有事件发生
多个Channel以事件的方式注册到同一个Selector。 - 如果有事件发生,就获取事件,然后对事件进行处理,只使用一个线程就管理了多个通道。
- 只有在一个通道真正有读写事件时,才会执行。
Selector、SelectionKey、ServerSocketChannel和SocketChannel的关系
- 客户端连接时候, 会通过ServerSocketChannel得到SocketChannel
- SocketChannel注册入Selector, 通过SelectableChannel注册
- 注册后返回SelectionKey
- Selector使用select方法进行监听
- 返回有事件发生的通道的SelectionKey进而获取到Channel
这里的Selector, SelectionKey, ServerSocketChannel, SocketChannel都是Nio中的。
NIO中的ServerSocketChannel功能类似java jdk中的ServerSocket,SocketChannel功能类似Socket
1.1 Selector
- slector.select();//阻塞
- slector.select(1000);//阻塞1000毫秒,在1000毫秒后返回
- slector.WAKEUP();//唤醒slector
- slector.selectNow();//不阻塞,立马返回
1.2 SelectionKey
标识Selector和网络通道的注册关系
public abstract class SelectionKey {
public abstract Selector selector();//得到与之关联的Selector对象
public abstract SelectableChannel channel();//得到与之关联的通道
public final Object attachment();//得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops);//设置或改变监听事件
public final boolean isAcceptable();//是否可以accept
public final boolean isReadable();//是否可以读
public final boolean isWritable();//是否可以写
}
- int OP_ACCEPT:有新的网络连接可以accept,值为16
- int OP_CONNECT:代表连接已经建立,值为8
- int OP_READ:代表读操作,值为1
- int OP_WRITE:代表写操作,值为4
1.3 ServerSocketChannel
ServerSocketChannel在服务器端监听薪的客户端Socket连接
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
{
public static ServerSocketChannel open();//得到一个ServerSocketChannel通道
public final ServerSocketChannel bind(SocketAddress local);//设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false标识采用非阻塞模式
public abstract SocketChannel accept();//接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops);//注册一个选择器并设置监听事件
}
1.4 SocketChannel
网络IO通道, 具体负责进行读写操作, Nio把缓冲区的数据写入到通道, 或者把通道的数据写入缓冲区
public abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{
public static SocketChannel open();//得到一个SocketChannel通道
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false标识采用非阻塞模式
public abstract boolean connect(SocketAddress remote);//连接服务器
public abstract boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public abstract int write(ByteBuffer src);//往通道里写数据
public abstract int read(ByteBuffer dst);//从通道里读数据
public abstract SelectionKey register(Selector sel, int ops, Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close();//关闭通道
}
2.Netty中NioEventLoop的选择器
Netty中的选择器是通过NioEventLoop.run()方法中的select方法去调用Nio包下的selector选择器实现的
NioEventLoop.run()
run 方法的主基调肯定是死循环等待 I/O 事件产生, 然后处理事件
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
- select 等待 I/O 事件的发生
- 处理发生的 I/O 事件
- 处理提交至线程中的任务, 包括提交的异步任务、定时任务、尾部任务
这个策略会根据最近将要发生的定时任务的执行时间来控制 select 最长阻塞的时间。
select方法中的是Nio包下的selector
3.Netty对Reactor的实现
- Reactor: I/O多路复用, 当多条连接共用一个阻塞对象后,进程只需要在一个阻塞对象上等待,而无需再轮训所有连接,常见的实现方式有select,epoll,kqueue等。
- Reactor: 是一种开发模式, 为反应堆, 代表事件反应, 有单线程, 多线程, 主从多线程
Reactor单线程模式
EventLoopGroup eventGroup = new NioEventLoopGroup(1)
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup);
非主从Reactor多线程模式
EventLoopGroup eventGroup = new NioEventLoopGroup()
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup);
主从Reactor多线程模式
EventLoopGroup bossGroup = new NioEventLoopGroup()
EventLoopGroup workerGroup = new NioEventLoopGroup()
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);