Netty核心源码与优化
1.Netty的优化
1.1 使用EventLoop的任务调度
直接使用 channel.writeAndFlush(data) 可能会导致线程切换,这是因为如果当前线程并不是该 Channel 所绑定的 EventLoop 线程,那么 writeAndFlush() 操作会将任务重新提交给关联的 EventLoop 线程执行,因此,最好确保在 EventLoop 的线程内执行 channel.writeAndFlush(data),以避免不必要的线程切换:
channel.eventLoop().execute(new Runnable){
@Override
public void run(){
channel.writeAndFlush(data);
}
});
1.2 减少ChannelPipeline的调用长度
使用ctx.channel.writeAndFlush(msg)时,会跳过ctx(ChannelHandlerContext)的调用链,消息会从ChannelPipeline中的头部开始进入ChannelPipeline的出站处理器,意味着消息将会从Pipeline的第一个ChannelOutboundHandler开始处理,从而导致增加ChannelPipeline的调用长度;
而ctx.writeAndFlush(msg)是在当前处理器的下一个出站处理器继续传递消息,减少了调用长度;
1.3 减少ChannelHandler的创建
如果你自定义的ChannelHandler是无状态的(即不需要保存任何状态参数,行为是确定的,不会因为不同的连接产生不同的结果),那么可以使用@Sharable注解,且在bootstrap时只创建一个实例,避免每次连接都new出handler对象;
- @Sharable:是Netty提供的注解,用于标记ChannelHandler可以被多个Channel共享;
- 有状态的ChannelHandler(如ByteToMessageDecoder等)不能使用@Sharable注解;
1.4 Boss与Worker线程配置优化
1.4.1 Boss线程优化
在Netty中Boss线程主要负责接受客户端的连接请求,通常默认为Boss线程数为一个线程,当有大量设备或客户端同时连接,Boss线程的处理能力可能会成为瓶颈;
优化思路:
- 多端口监听:让服务端监听多个端口,以分摊单个端口的连接压力;
- 主从Reactor模型:主Reactor负责处理客户端连接请求,从Reactor负责处理网络IO操作,每个ServerSocketChannel(监听客户端连接的对象)都有一个Boss线程,这样可以并行处理多个客户端的连接请求;
1.4.2 Worker线程池优化
Netty中的Worker线程(也叫IO线程)负责处理所有与数据读写相关操作,默认Netty会为Worker线程池分配一个大小为CPU内核数*2的线程数;
优化思路:
- 增加IO线程数:如果系统运行过程中IO线程的CPU占用率过高,或者Worker线程在处理读写操作时遇到瓶颈,那么可以考虑增加IO线程数来提升性能;
配置Worker线程数方法:
- 通过Netty API指定线程数:创建NioEventLoopGroup时,显示指定IO线程池大小,new NioEventLoopGroup(16);
- 通过系统参数指定线程数:直接设置JVM启动参数(-Dio.netty.eventLoopThreads=16),这样设置所有的NioEventLoopGroup实例都会使用该值;
1.5 线程隔离优化
如果服务端不做复杂的业务逻辑操作,仅是简单的内存操作和消息转发,则可以通过调大NioEventLoop工作线程池的方式,直接在IO线程中执行业务ChannelHandler,这样便减少了一次线程上下文切换,性能反而更高;
如果有复杂的业务逻辑操作,建议IO线程和业务线程分离,对于IO线程,由于互相之间不存在锁竞争,可以创建一个大的NioEventLoopGroup线程组(具体线程数量要根据CPU核心数、连接数和负载情况调整,线程过多会带来线程上下文切换开销大,反而可能性能降低),所有Channel都共享同一个线程池;对于后端的业务线程池,则建议创建多个小的业务线程池,线程池可以与IO线程绑定,这样既减少了锁竞争,又提升了后端的处理性能;
1.6 接受和发送缓冲区优化
对于不同的应用场景,收发缓冲区的最优值可能不同,需要根据实际场景结合性能测试数据进行优化;比如对实时性要求比较高的应用可以适当减少缓冲区大小,以减少数据在网络缓冲区内的停留时间;如对吞吐量要求高的应用可以适当增加缓冲区,减少频繁的网络IO,提高传输效率;
配置参数(在启动Bootstrap时配置):
- ChannelOption.SO_SNDBUF:发送缓冲区大小,发送缓冲区用于保存发送数据,直到发送成功;
- ChannelOption.SO_RCVBUF:接收缓冲区大小,接收缓冲区用于保存网络协议站内收到的数据,直到程序读取成功;
1.7 一些配置参数的设置
在对于响应时间有要求的场景,使用.childOption(ChannelOption.TCP_NODELAY, true)
和.option(ChannelOption.TCP_NODELAY, true)来禁用nagle算法,不等待,立即发送;
2. Netty核心源码
2.1 Netty启动过程
2.1.1 创建服务端Channel
- ServerBootstrap对象的bind()方法作为入口;
- AbstractBootstrap中的initAndRegister()进行创建和初始化Channel;
- ChannelFactory.newChannel()使用反射创建NioServerSocketChannel;
- NioServerSocketChannel的构造方法中,通过JDK提供的SelectorProvider来打开ServerSocketChannel;
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
- SelectorProvider是JavaNIO提供的一个工厂类,用于生成与操作系统适配的通道与选择器,通过openServerSocketChannel()打开一个新的ServerSocketChannel实例(ServerSocketChannel是JDK提供的非阻塞服务通道类,支持异步IO操作);
- 在AbstractNioChannel的构造方法中调用SelectableChannel继承的AbstractSelectableChannel的方法configureBlocking(false)将其设置为非阻塞模式;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException var7) {
try {
ch.close();
} catch (IOException var6) {
logger.warn("Failed to close a partially initialized socket.", var6);
}
throw new ChannelException("Failed to enter non-blocking mode.", var7);
}
}
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
boolean blocking = !nonBlocking;
if (block != blocking) {
if (block && haveValidKeys())
throw new IllegalBlockingModeException();
implConfigureBlocking(block);
nonBlocking = !block;
}
}
return this;
}
- AbstractNioChannel继承AbstractChannel,使用super(parent)调用AbstractChannel的构造方法,创建Channel的id(唯一标识)、unsafe(底层操作接口)和pipeline(数据处理链);
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
- 通过NioServerSocketChannelConfig获取一些TCP底层参数;
2.1.2 初始化服务端Channel
- AbstractBootstrap中的initAndRegister()进行初始化channel;
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
- init(channel)会对Channel进行初始化配置,AbstractBootstrap这里只是抽象方法,在ServerBootstrap与Bootstrap的init()方法的实现中设置channelOptions以及Attributes等;
- 将用户在 ServerBootstrap 上配置的 ChannelOption 和 AttributeKey 临时保存起来,然后应用到服务器端的 Channel 上。这样做是为了确保在启动 Channel 前,所有配置都已正确设置;
-
currentChildOptions 和 currentChildAttrs 是用来暂存用户定义的 Channel 配置选项和属性的局部变量,这样可以在初始化通道时将这些选项和属性传递给每个新创建的客户端 Channel,确保每个 Channel 都具有一致的配置;
-
如果设置了serverBootstrap.handler()的话,会加入到pipeline中;
-
添加连接器ServerBootstrapAcceptor,有新连接加入后,将自定义的childHandler加入到连接的pipeline中;
void init(Channel channel) {
setChannelOptions(channel, this.newOptionsArray(), logger);
setAttributes(channel, this.newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = this.childGroup;
final ChannelHandler currentChildHandler = this.childHandler;
final Map.Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(this.childOptions);
final Map.Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(this.childAttrs);
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}
2.1.3 注册selector
- initAndRegister()方法中的ChannelFuture regFuture = config().group().register(channel);进行
注册 ;(代码见初始化服务端Channel) - 在AbstractChannel中的内部类AbstractUnsafe的register()方法中完成实际注册;
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (AbstractChannel.this.isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
} else if (!AbstractChannel.this.isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} else {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
AbstractChannel.AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var4);
}
}
}
}
- AbstractChannel.this.eventLoop = eventLoop; 进行eventLoop的赋值操作,后续的IO事件
工作将在由该eventLoop执行; - 调用register0(promise)中的doRegister进行实际的注册;
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
return;
}
boolean firstRegistration = this.neverRegistered;
AbstractChannel.this.doRegister();
this.neverRegistered = false;
AbstractChannel.this.registered = true;
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
this.safeSetSuccess(promise);
AbstractChannel.this.pipeline.fireChannelRegistered();
if (AbstractChannel.this.isActive()) {
if (firstRegistration) {
AbstractChannel.this.pipeline.fireChannelActive();
} else if (AbstractChannel.this.config().isAutoRead()) {
this.beginRead();
}
}
} catch (Throwable var3) {
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var3);
}
}
- 在AbstractNioChannel中对doRegister进行了方法实现;
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
2.1.4 绑定端口
- 启动Netty的时候调用AbstracBootstrap的bind()方法,channelFuture = bootstrap.bind(port).sync();
- 在AbstracBootstrap的doBind0(),启动一个线程进行执行绑定端口操作;
public ChannelFuture bind(int inetPort) {
return this.bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
- 调用AbstractChannelHandlerContext的bind()方法,再次启动线程执行,这个方法会在 ChannelPipeline 中沿着出站方向,依次触发每个出站处理器的 bind() 方法,最终交由实际的 Channel 执行底层绑定操作;
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (this.isNotValidPromise(promise, false)) {
return promise;
} else {
final AbstractChannelHandlerContext next = this.findContextOutbound(512);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, (Object)null, false);
}
return promise;
}
}
-
当绑定操作到达 NioServerSocketChannel 时,会调用 doBind() 方法,doBind() 方法通过底层 JDK 的ServerSocketChannel.bind(SocketAddress) 方法完成绑定,使服务端 Channel 在指定地址上开始监听连接请求;
@SuppressJava6Requirement(
reason = "Usage guarded by java version check"
)
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
this.javaChannel().bind(localAddress, this.config.getBacklog());
} else {
this.javaChannel().socket().bind(localAddress, this.config.getBacklog());
}
}
2.2 连接请求过程
2.2.1 新连接的接入
- 入口在NioEventLoop的processSelectedKey(),当有新的连接请求时,NioEventLoop会通过Selector监听到该事件(OP_ACCEPT),然后processSelectedKey()被触发,处理事件;
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
NioEventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable var6) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
} else {
try {
int readyOps = k.readyOps();
if ((readyOps & 8) != 0) {
int ops = k.interestOps();
ops &= -9;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & 4) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & 17) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException var7) {
unsafe.close(unsafe.voidPromise());
}
}
}
- 调用AbstractNioChannel的内部类NioMessageUnsafe的read()方法(实现NioUnsafe接口);
- read()中调用底层的doReadMessage()方法;
public void read() {
assert AbstractNioMessageChannel.this.eventLoop().inEventLoop();
ChannelConfig config = AbstractNioMessageChannel.this.config();
ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();
RecvByteBufAllocator.Handle allocHandle = AbstractNioMessageChannel.this.unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
int localRead;
try {
do {
localRead = AbstractNioMessageChannel.this.doReadMessages(this.readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while(AbstractNioMessageChannel.this.continueReading(allocHandle));
} catch (Throwable var11) {
exception = var11;
}
localRead = this.readBuf.size();
for(int i = 0; i < localRead; ++i) {
AbstractNioMessageChannel.this.readPending = false;
pipeline.fireChannelRead(this.readBuf.get(i));
}
this.readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = AbstractNioMessageChannel.this.closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
AbstractNioMessageChannel.this.inputShutdown = true;
if (AbstractNioMessageChannel.this.isOpen()) {
this.close(this.voidPromise());
}
}
} finally {
if (!AbstractNioMessageChannel.this.readPending && !config.isAutoRead()) {
this.removeReadOp();
}
}
}
- NioServerSocketChannel中实现了AbstractNioChannel的doReadMessages()方法,通过SocketUtil调用JDK NIO的ServerSocketChannel的accept()方法接收新的连接;
- 每次接收到新的SocketChannel后,将其封装成NioSocketChannel并添加到buf列表中;
- 这个buf列表会在后续流程中被处理,将新连接添加到ChannelPipeline中,触发相关的ChannelHandler进行链接的初始化和读写操作;
protected int doReadMessages(List<Object> buf) throws Exception {
java.nio.channels.SocketChannel ch = SocketUtils.accept(this.javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable var6) {
logger.warn("Failed to create a new channel from an accepted socket.", var6);
try {
ch.close();
} catch (Throwable var5) {
logger.warn("Failed to close a socket.", var5);
}
}
return 0;
}
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return (SocketChannel)AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException var2) {
throw (IOException)var2.getCause();
}
}
2.2.2 注册读事件
-
AbstractNioChannel的内部类NioMessageUnsafe的read()方法中通过以下遍历bug列表,将接收到的连接逐一传递到ChannelPipeline中,触发相关的ChannelHandler进行链接的初始化和读写操作;
for(int i = 0; i < localRead; ++i) {
AbstractNioMessageChannel.this.readPending = false;
pipeline.fireChannelRead(this.readBuf.get(i));
}
-
AbstractChannelHandlerContext的invokeChannelRead(Object msg)方法负责将读事件传递给 ChannelPipeline中的下一个ChannelHandler,并确保事件能被正确处理;
private void invokeChannelRead(Object msg) {
if (this.invokeHandler()) {
try {
// 执行 channelRead,注意第一次执行是 HeadHandler,第二次是 ServerBootstrapAcceptor
// ServerBootstrapAcceptor 用于处理新连接的接入,并注册 selector 监听读事件
((ChannelInboundHandler)this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.invokeExceptionCaught(var3); // 处理异常
}
} else {
this.fireChannelRead(msg); // 将事件继续传递给下一个 Handler
}
}