当前位置: 首页 > article >正文

Netty源码—1.服务端启动流程二

大纲

1.服务端启动整体流程及关键方法

2.服务端启动的核心步骤

3.创建服务端Channel的源码

4.初始化服务端Channel的源码

5.注册服务端Channel的源码

6.绑定服务端端口的源码

7.服务端启动流程源码总结

5.注册服务端Channel的源码

(1)注册服务端Channel的入口

(2)注册Selector的主要步骤

(3)注册服务端Channel总结

(1)注册服务端Channel的入口

首先AbstractBootstrap的config()方法是一个抽象方法,会由ServerBootstrap来实现。

ServerBootstrap的config()方法会返回一个封装了ServerBootstrap对象的ServerBootstrapConfig对象。所以执行代码config().group()时会调用AbstractBootstrapConfig的group()方法,也就是执行ServerBootstrap的group()方法返回用户通过group()方法设置的一个NioEventLoopGroup对象。因此config().group().register(channel)最后会调用NioEventLoopGroup的register()方法。

//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. 
//It support method-chaining to provide an easy way to configure the AbstractBootstrap.
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    volatile EventLoopGroup group;
    ...
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        ...
        //1.创建服务端Channel
        channel = channelFactory.newChannel();
        //2.初始化服务端Channel
        init(channel);
        ...
        //3.注册服务端Channel并启动一个NioEventLoop线程,通过NioEventLoopGroup的register()方法进行注册
        ChannelFuture regFuture = config().group().register(channel);
        ...
        return regFuture;
    }
    //Returns the AbstractBootstrapConfig object that can be used to obtain the current config of the bootstrap.
    public abstract AbstractBootstrapConfig<B, C> config();
    //Returns the configured EventLoopGroup or null if non is configured yet.
    public final EventLoopGroup group() {
        return group;
    }
    ...
}

//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    ...
    @Override
    public final ServerBootstrapConfig config() {
        return config;
    }
    ...
}

public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
    protected final B bootstrap;
    ...
    protected AbstractBootstrapConfig(B bootstrap) {
        this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
    }
    //Returns the configured EventLoopGroup or null if non is configured yet.
    public final EventLoopGroup group() {
        //比如返回一个NioEventLoopGroup对象
        return bootstrap.group();
    }
    ...
}

NioEventLoopGroup继承自抽象类MultithreadEventLoopGroup,调用NioEventLoopGroup的register()方法也就是调用MultithreadEventLoopGroup的register()方法。

调用NioEventLoopGroup的register()方法时,会先通过next()方法获取一个NioEventLoop对象,然后再调用NioEventLoop的register()方法。而调用NioEventLoop的register()方法,其实就是调用抽象类SingleThreadEventLoop的register()方法。

在SingleThreadEventLoop的register()方法中,promise.channel().unsafe()会返回一个Channel.Unsafe类型的对象。而AbstractChannel实现了Channel接口,AbstractChannel的内部类AbstractUnsafe也实现了Channel接口的内部接口Unsafe。

所以promise.channel().unsafe().register(this, promise)最后会调用AbstractUnsafe的register()方法。

//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    ...
    ...
}

//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    ...
    @Override
    public ChannelFuture register(Channel channel) {
        //先通过next()方法获取一个NioEventLoop,然后通过NioEventLoop.register()方法注册服务端Channel
        return next().register(channel);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    ...
}

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    ...
    ...
}

//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    ...
    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //调用AbstractUnsafe的register()方法
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    ...
}

所以注册服务端Channel的关键逻辑其实就体现在AbstractUnsafe的register()方法上。该方法会先将EventLoop事件循环器绑定到服务端Channel即NioServerSocketChanel上,然后再调用AbstractUnsafe的register0()方法将服务端Channel注册到Selector上。

//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private volatile EventLoop eventLoop;
    ...
    //Unsafe implementation which sub-classes must extend and use.
    protected abstract class AbstractUnsafe implements Unsafe {
        ...
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            //绑定事件循环器,即绑定一个NioEventLoop到该Channel上
            AbstractChannel.this.eventLoop = eventLoop;
            //注册Selector,并启动一个NioEventLoop
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                ...
                //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
                ...
            }
        }

        private void register0(ChannelPromise promise) {
            ...
        }
        ...
    }
    ...
}

注意:AbstractUnsafe的register()方法会将前面获取到的一个NioEventLoop事件循环器绑定到服务端Channel上,之后便可以通过channel.eventLoop()来取出这个NioEventLoop事件循环器了。因此,一个服务端Channel对应一个NioEventLoop事件循环器。此外,会通过启动一个NioEventLoop线程来调用register0()方法将服务端Channel注册到Selector上。

总结:创建服务端Channel后,就会从NioEventLoopGroup中获取一个NioEventLoop出来进行绑定,并启动这个NioEventLoop线程将这个服务端Channel注册到Selector上以及执行线程的run()逻辑监听事件等。

(2)注册Selector的主要步骤

AbstractUnsafe.register0()方法主要有4个步骤。

步骤一:调用JDK底层注册服务端Channel到Selector上

doRegister()方法是由AbstractChannel的子类AbstractNioChannel来实现的。

在AbstractNioChannel的doRegister()方法中,首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法,将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上。这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来。而且注册的ops值是0,表示此时还不关注任何事件。

步骤二:回调handlerAdded事件

步骤三:传播channelRegisterd事件

步骤四:其他逻辑

//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private volatile EventLoop eventLoop;
    ...
    //Unsafe implementation which sub-classes must extend and use.
    protected abstract class AbstractUnsafe implements Unsafe {
        ...
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            //绑定事件循环器,即绑定一个NioEventLoop到该Channel上
            AbstractChannel.this.eventLoop = eventLoop;
            //注册Selector,并启动一个NioEventLoop
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                ...
                //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
                ...
            }
        }

        private void register0(ChannelPromise promise) {
            ...
            boolean firstRegistration = this.neverRegistered;
            //1.调用JDK底层注册服务端Channel到Selector上
            doRegister();
            this.neverRegistered = false;
            this.registered = true;
            //2.回调handlerAdded事件
            this.pipeline.invokeHandlerAddedIfNeeded();
            safeSetSuccess(promise);
            //3.传播channelRegisterd事件到用户代码里
            this.pipeline.fireChannelRegistered();
            //4.其他逻辑
            if (isActive()) {
                if (firstRegistration) {
                    this.pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
            ...
        }
        ...
    }
    
    //Is called after the Channel is registered with its EventLoop as part of the register process.
    //Sub-classes may override this method
    protected void doRegister() throws Exception {
        // NOOP
    }
    ...
}

//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;//这是NIO中的Channel
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    ...
    //Create a new instance
    //@param parent,the parent Channel by which this instance was created. May be null.
    //@param ch,he underlying SelectableChannel on which it operates
    //@param readInterestOp,the ops to set to receive data from the SelectableChannel
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中
        //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        ...
        //设置Channel对象为非阻塞模式
        ch.configureBlocking(false);
        ...
    }
    
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            ...
            //首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法,
            //将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上;
            //这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来;
            //而且注册的ops值是0,表示此时还不关注任何事件;
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
            ...
        }
    }
        
    protected SelectableChannel javaChannel() {
        return ch;
    }
    ...
}

(3)注册服务端Channel总结

注册服务端Channel的入口是AbstractChannel的内部类AbstractUnsafe的register()方法。

首先会把一个NioEventLoop线程和当前的Channel进行绑定,然后再调用AbstractUnsafe的register0()方法进行注册。而register0()方法会把前面创建的JDK底层NIO的Channel注册到Selector上,并且把Netty领域的Channel当作一个attachment绑定到Selector上去,最后回调handlerAdded事件以及传播channelRegistered事件到用户代码里。

ServerBootstrap.bind() //用户代码入口
  AbstractBootstrap.initAndRegister() //初始化并注册Channel
    channelFactory.newChannel() //创建服务端Channel
    ServerBootstrap.init() //初始化服务端Channel
    NioEventLoopGroup.register() //注册服务端Channel
      NioEventLoop.register() //注册服务端Channel
        AbstractChannel.AbstractUnsafe.register() //注册Channel入口
          this.eventLoop = eventLoop //将Channel绑定NioEventLoop线程
          AbstractChannel.AbstractUnsafe.register0() //实际注册
            AbstractNioChannel.doRegister() //调用JDK底层注册Channel到Selector
            invokeHandlerAddedIfNeeded() //回调handlerAdded事件
            fireChannelRegistered() //传播channelRegistered事件

补充说明一:Java类是单继承的,Java接口却是多继承的。因为前者不能区分父类相同名字方法要用哪一个,后者则由于还没实现接口,即使父类有相同名字接口也不影响。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    ...
}

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    ...
}

补充说明二:如果监听一个端口,就创建一个服务端Channel。如果监听多个端口,就创建多个服务端Channel。

每个Channel绑定于NioEventLoopGroup的next()方法返回的一个NioEventLoop。

6.绑定服务端端口的源码

(1)绑定服务端端口的时机

(2)AbstractUnsafe.bind()方法的主要工作

(3)调用JDK底层绑定端口

(4)传播ChannelActive事件

(5)注册ACCEPT事件到Selector

(6)绑定服务端端口总结

(1)绑定服务端端口的时机

ServerBootstrap的bind()方法,首先执行AbstractBootstrap的initAndRegister()方法完成了服务端Channel的初始化和注册后,就会调用AbstractBootstrap的doBind0()方法绑定端口。

//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    ...
    ...
}

//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. 
//It support method-chaining to provide an easy way to configure the AbstractBootstrap.
//When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP).
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    ...
    //Create a new Channel and bind it.
    public ChannelFuture bind(int inetPort) {
        //首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind()
        return bind(new InetSocketAddress(inetPort));
    }
    
    //Create a new Channel and bind it.
    public ChannelFuture bind(SocketAddress localAddress) {
        //验证服务启动需要的必要参数
        validate();
        if (localAddress == null) throw new NullPointerException("localAddress");
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channel
        final Channel channel = regFuture.channel();
        ...
        doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口
        ...
        return promise;
    }
    
    private static void doBind0(final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

        //This method is invoked before channelRegistered() is triggered.
        //Give user handlers a chance to set up the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    ...
}

(2)AbstractUnsafe.bind()方法的主要工作

AbstractBootstrap的doBind0()方法会执行代码channel.bind(),这个channel其实就是通过channelFactory工厂反射生成的NioServerSocketChannel。

所以执行channel.bind()其实就是执行AbstractChannel的bind()方法。经过逐层调用,最后会落到调用AbstractChannel内部类AbstractUnsafe的bind()方法。

AbstractUnsafe的bind()方法主要做两件事:

一.调用JDK底层绑定端口

二.传播channelActive事件并注册ACCEPT事件

//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.
public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
    ...
}

//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    ...
}

//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {
    ...
}

//A skeletal {@link Channel} implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    private final DefaultChannelPipeline pipeline;
    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    ...
}

//The default ChannelPipeline implementation. 
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }
    ...
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    ...
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) throw new NullPointerException("localAddress");
        if (!validatePromise(promise, false)) return promise;
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
    
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                //执行DefaultChannelPipeline.HeadContext的bind()方法
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }
    ...
}

//The default ChannelPipeline implementation. 
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
            //执行AbstractChannel内部类AbstractUnsafe的bind()方法
            unsafe.bind(localAddress, promise);
        }
        ...
    }
    ...
}

//A skeletal {@link Channel} implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    ...
    //Unsafe implementation which sub-classes must extend and use.
    protected abstract class AbstractUnsafe implements Unsafe {
        ...
        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ...
            boolean wasActive = isActive();
            try {
                //1.调用JDK底层绑定端口
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //2.传播channelActive事件并注册ACCEPT事件
                        pipeline.fireChannelActive();
                    }
                });
            }
            safeSetSuccess(promise);
        }
        ...
    }
    ...
    //Bind the Channel to the SocketAddress
    protected abstract void doBind(SocketAddress localAddress) throws Exception;
    ...
}

(3)调用JDK底层绑定端口

AbstractUnsafe的bind()方法中所调用的doBind()方法是属于AbstractChannel的抽象接口,会由NioServerSocketChannel来进行具体的实现,即调用JDK底层NIO的bind()方法来绑定端口。

//A skeletal {@link Channel} implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    //Bind the Channel to the SocketAddress
    protected abstract void doBind(SocketAddress localAddress) throws Exception;
    ...
}

//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.
public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
    ...
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }
    ...
}

//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;//这是NIO中的Channel
    ...
    protected SelectableChannel javaChannel() {
        return ch;
    }
    ...
}

(4)传播ChannelActive事件

绑定完端口后,就会执行代码pipeline.fireChannelActive(),也就是调用DefaultChannelPipeline.fireChannelActive()。

最后会调用DefaultChannelPipeline.HeadContext的channelActive()方法传播channelActive事件。

//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...
    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    ...
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    ...
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }
    
    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                //执行DefaultChannelPipeline.HeadContext的channelActive()方法
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }
}

//The default ChannelPipeline implementation. 
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        ...
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //1.传播channelActive事件
            ctx.fireChannelActive();
            //2.注册ACCEPT事件
            readIfIsAutoRead();
        }
        ...
    }
}

(5)注册ACCEPT事件到Selector

传播完channelActive事件后,便会调用HeadContext.readIfIsAutoRead()方法。然后逐层调用到AbstractChannel内部类AbstractUnsafe的beginRead()方法,并最终调用到AbstractNioChannel的doBeginRead()方法来注册ACCEPT事件。

//The default ChannelPipeline implementation. 
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    private final Channel channel;
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        ...
        private void readIfIsAutoRead() {
            //isAutoRead()方法默认会返回true
            if (channel.config().isAutoRead()) {
                //调用AbstractChannel的read()方法
                channel.read();
            }
        }
        ...
    }
}

//A skeletal {@link Channel} implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    ...
    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }
    ...
}

//The default ChannelPipeline implementation. 
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...
    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }
    ...
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    ...
    @Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }
        return this;
    }

    private void invokeRead() {
        if (invokeHandler()) {
            try {
                //执行DefaultChannelPipeline.HeadContext的read()方法
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }
    ...
}

//The default ChannelPipeline implementation. 
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
        ...
    }
}

//A skeletal {@link Channel} implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    ...
    //Unsafe implementation which sub-classes must extend and use.
    protected abstract class AbstractUnsafe implements Unsafe {
        ...
        @Override
        public final void beginRead() {
            assertEventLoop();
            if (!isActive()) return;
            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }
        ...
    }
    
    //Schedule a read operation.
    protected abstract void doBeginRead() throws Exception;
    ...
}

//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    boolean readPending;
    ...
    @Override
    protected void doBeginRead() throws Exception {
        //Channel.read() or ChannelHandlerContext.read() was called
        //this.selectionKey就是前面注册服务端Channel时返回的对象
        //注册服务端Channel时,注册ops的值是0,表示还不关注任何事件
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) return;

        readPending = true;
        final int interestOps = selectionKey.interestOps();
        //这里的readInterestOp就是前面newChannel()时传入的SelectionKey.OP_ACCEPT
        //所以这样要做的工作就是,告诉JDK的Selector一切工作准备就绪,只剩下把ACCEPT事件注册到Selector上
        if ((interestOps & readInterestOp) == 0) {
            //关注ACCEPT事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    ...
}

(6)绑定服务端端口总结

绑定服务端端口,最终会调用JDK底层API去进行实际绑定。绑定端口成功后,会由DefaultChannelPipeline传播channelActive事件,以及把ACCEPT事件注册到Selector上,从而可以通过Selector监听新连接的接入。

ServerBootstrap.bind() //用户代码入口
  AbstractBootstrap.initAndRegister() //初始化并注册Channel
    channelFactory.newChannel() //创建服务端Channel
    ServerBootstrap.init() //初始化服务端Channel
    NioEventLoopGroup.register() //注册服务端Channel
  AbstractBootstrap.doBind0() //绑定服务端端口
    AbstractChannel.AbstractUnsafe.bind() //绑定服务端端口入口
      NioServerSocketChannel.doBind() //NioServerSocketChannel实现
        javaChannel().bind() //JDK底层API绑定端口
      DefaultChannelPipeline.fireChannelActive() //传播channelActive事件
        HeadContext.readIfIsAutoRead() //注册ACCEPT事件到Selector上

7.服务端启动流程源码总结

initAndRegister()里的newChannel()会通过反射创建JDK底层Channel,同时会创建该Channel对应的Config对象并设置该Channel为非阻塞模式。总之,创建服务端Channel时会完成Netty几大基本组件的创建。如Channel、ChannelConfig、ChannelId、Unsafe、ChannelPipeline。

初始化服务端Channel时,会设置服务端Channel和客户端Channel的Option和Attr,并且给服务端Channel添加连接接入器ServerBootstrapAcceptor用于接收新连接。

注册服务端Channel时,会调用JDK底层的API将Channel注册到Selector,同时将Netty领域的Channel当作attachment注册到Selector上,并且回调handlerAdded事件和传播channelRegistered事件到其他用户代码中。

绑定服务端端口时,会调用JDK底层API进行端口绑定并传播channelActive事件。当channelActive事件被传播后,才真正进行有效的服务端端口绑定,也就是把ACCEPT事件注册到Selector上。


http://www.kler.cn/a/593392.html

相关文章:

  • 蓝桥杯 修剪灌木
  • OAK相机入门(二):深度噪声滤除
  • Centos内核升级
  • RTSP/Onvif安防监控系统EasyNVR级联视频上云系统EasyNVS报错“Login error”的原因排查与解决
  • Dify:开源大模型应用开发平台全解析
  • 解锁MySQL 8.0.41源码调试:Mac 11.6+CLion 2024.3.4实战指南
  • 河南大学数据库实验5
  • 涨薪技术|Kubernetes(k8s)之Pod环境变量
  • OpenEuler kinit报错找不到文件的解决办法
  • Mybatis使用Druid连接池
  • Node.js模块:使用 Bull 打造高效的任务队列系统
  • 【IDEA中配置Maven国内镜像源】
  • 微信小程序面试内容整理-如何使用wx.request()进行网络请求
  • 链表操作:分区与回文判断
  • Linux与深入HTTP序列化和反序列化
  • C++内存分配方式
  • CVPR2024 | SWARM | 并非所有提示都安全:针对预训练视觉Transformer的可开关后门攻击
  • Python 监听模式(Observer Pattern)
  • 【LeetCode】622、设计循环队列
  • 软考程序员考试知识点汇总