当前位置: 首页 > article >正文

Netty源码—5.Pipeline和Handler一

大纲

1.Pipeline和Handler的作用和构成

2.ChannelHandler的分类

3.几个特殊的ChannelHandler

4.ChannelHandler的生命周期

5.ChannelPipeline的事件处理

6.关于ChannelPipeline的问题整理

7.ChannelPipeline主要包括三部分内容

8.ChannelPipeline的初始化

9.ChannelPipeline添加ChannelHandler

10.ChannelPipeline删除ChannelHandler

11.Inbound事件的传播

12.Outbound事件的传播

13.ChannelPipeline中异常的传播

14.ChannelPipeline总结

1.Pipeline和Handler的作用和构成

(1)Pipeline和Handler的作用

(2)Pipeline和Handler的构成

(1)Pipeline和Handler的作用

可以在处理复杂的业务逻辑时避免if else的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。

Netty通过责任链模式来组织代码逻辑,能够支持逻辑的动态添加和删除,能够支持各类协议的扩展。

(2)Pipeline和Handler的构成

在Netty里,一个连接对应着一个Channel。这个Channel的所有处理逻辑都在一个叫ChannelPipeline的对象里,ChannelPipeline是双向链表结构,它和Channel之间是一对一的关系。

ChannelPipeline里的每个结点都是一个ChannelHandlerContext对象,这个ChannelHandlerContext对象能够获得和Channel相关的所有上下文信息。每个ChannelHandlerContext对象都包含一个逻辑处理器ChannelHandler,每个逻辑处理器ChannelHandler都处理一块独立的逻辑。

2.ChannelHandler的分类

ChannelHandler有两大子接口,分别为Inbound和Outbound类型:第一个子接口是ChannelInboundHandler,用于处理读数据逻辑,最重要的方法是channelRead()。第二个子接口是ChannelOutboundHandler,用于处理写数据逻辑,最重要的方法是write()。

这两个子接口默认的实现分别是:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。它们分别实现了两个子接口的所有功能,在默认情况下会把读写事件传播到下一个Handler。

InboundHandler的事件通常只会传播到下一个InboundHandler,OutboundHandler的事件通常只会传播到下一个OuboundHandler,InboundHandler的执行顺序与实际addLast的添加顺序相同,OutboundHandler的执行顺序与实际addLast的添加顺序相反。

Inbound事件通常由IO线程触发,如TCP链路的建立事件、关闭事件、读事件、异常通知事件等。其触发方法一般带有fire字眼,如下所示:

ctx.fireChannelRegister()、

ctx.fireChannelActive()、

ctx.fireChannelRead()、

ctx.fireChannelReadComplete()、

ctx.fireChannelInactive()。

Outbound事件通常由用户主动发起的网络IO操作触发,如用户发起的连接操作、绑定操作、消息发送等操作。其触发方法一般如:ctx.bind()、ctx.connect()、ctx.write()、ctx.flush()、ctx.read()、ctx.disconnect()、ctx.close()。

3.几个特殊的ChannelHandler

(1)ChannelInboundHandlerAdapter

(2)ChannelOutboundHandlerAdapter

(3)ByteToMessageDecoder

(4)SimpleChannelInboundHandler

(5)MessageToByteEncoder

(1)ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter主要用于实现ChannelInboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。

//Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline.
public interface ChannelHandler {
    //Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    //Gets called after the ChannelHandler was removed from the actual context and it doesn't handle events anymore.
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    //Gets called if a Throwable was thrown.
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    //Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {
    // Not using volatile because it's used only for a sanity check.
    boolean added;

    //Return true if the implementation is Sharable and so can be added to different ChannelPipelines.
    public boolean isSharable() {
        Class<?> clazz = getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }

    //Do nothing by default, sub-classes may override this method.
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    //Do nothing by default, sub-classes may override this method.
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    //Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

//ChannelHandler which adds callbacks for state changes. 
//This allows the user to hook in to state changes easily.
public interface ChannelInboundHandler extends ChannelHandler {
    //The Channel of the ChannelHandlerContext was registered with its EventLoop
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    //The Channel of the ChannelHandlerContext was unregistered from its EventLoop
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    //The Channel of the ChannelHandlerContext is now active
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    //The Channel of the ChannelHandlerContext was registered is now inactive and reached its end of lifetime.
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    //Invoked when the current Channel has read a message from the peer.
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    //Invoked when the last message read by the current read operation has been consumed by #channelRead(ChannelHandlerContext, Object).
    //If ChannelOption#AUTO_READ is off, no further attempt to read an inbound data from the current Channel will be made until ChannelHandlerContext#read() is called.
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    //Gets called if an user event was triggered.
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    //Gets called once the writable state of a Channel changed. 
    //You can check the state with Channel#isWritable().
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    //Gets called if a Throwable was thrown.
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

//Abstract base class for ChannelInboundHandler implementations which provide implementations of all of their methods.
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    //Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    //Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    //Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    //Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    //Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    //Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    //Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    //Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline.
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    //Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

(2)ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter主要用于实现ChannelOutboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。

//ChannelHandler which will get notified for IO-outbound-operations.
public interface ChannelOutboundHandler extends ChannelHandler {
    //Called once a bind operation is made.
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    //Called once a connect operation is made.
    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    //Called once a disconnect operation is made.
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    //Called once a close operation is made.
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    //Called once a deregister operation is made from the current registered EventLoop.
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    //Intercepts ChannelHandlerContext#read().
    void read(ChannelHandlerContext ctx) throws Exception;

    //Called once a write operation is made. The write operation will write the messages through the ChannelPipeline.
    //Those are then ready to be flushed to the actual Channel once Channel#flush() is called.
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    //Called once a flush operation is made. The flush operation will try to flush out all previous written messages that are pending.
    void flush(ChannelHandlerContext ctx) throws Exception;
}

//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    //Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. 
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    //Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.disconnect(promise);
    }

    //Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.close(promise);
    }

    //Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    //Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    //Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    //Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

(3)ByteToMessageDecoder

基于这个ChannelHandler可以实现自定义解码,而不用关心ByteBuf的强转和解码结果的传递。Netty里的ByteBuf默认下使用的是堆外内存,ByteToMessageDecoder会自动进行内存的释放,不用操心内存管理。我们自定义的ChannelHandler继承了ByteToMessageDecoder后,需要实现decode()方法。

(4)SimpleChannelInboundHandler

基于这个ChannelHandler可以实现每一种指令的处理,不再需要强转、不再有冗长的if else逻辑、不再需要手动传递对象。

同时还可以自动释放没有往下传播的ByteBuf,因为我们编写指令处理ChannelHandler时,可能会编写不用关心的if else判断,然后手动传递无法处理的对象至下一个指令处理器。

//xxxHandler.java
if (packet instanceof xxxPacket) {
    //进行处理
} else {
    ctx.fireChannelRead(packet);
}

(5)MessageToByteEncoder

基于这个ChannelHandler可以实现自定义编码,而不用关心ByteBuf的创建,不用把创建完的ByteBuf进行返回。

4.ChannelHandler的生命周期

(1)ChannelHandler回调方法的执行顺序

ChannelHandler回调方法的执行顺序可以称为ChannelHandler的生命周期。

新建连接时ChannelHandler回调方法的执行顺序是:handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()。

关闭连接时ChannelHandler回调方法的执行顺序是:channelInactive() -> channelUnregistered() -> handlerRemoved()。

接下来是ChannelHandler具体的回调方法说明,其中一二三的顺序可以参考AbstractChannel的内部类AbstractUnsafe的register0()方法。

一.handlerAdded()

检测到新连接后调用"ch.pipeline().addLast(...)"之后的回调,表示当前Channel已成功添加一个ChannelHandler。

二.channelRegistered()

表示当前Channel已和某个NioEventLoop线程建立了绑定关系,已经创建了一个Reactor线程来处理当前这个Channel的读写。

三.channelActive()

当Channel的Pipeline已经添加完所有的ChannelHandler以及绑定好一个NioEventLoop线程,这个Channel对应的连接才算真正被激活,接下来就会回调该方法。

四.channelRead()

服务端每次收到客户端发送的数据时都会回调该方法,表示有数据可读。

五.channelReadComplete()

服务端每次读完一条完整的数据都会回调该方法,表示数据读取完毕。

六.channelInactive()

表示这个连接已经被关闭,该连接在TCP层已经不再是ESTABLISH状态。

七.channelUnregister()

表示与这个连接对应的NioEventLoop线程移除了对这个连接的处理。

八.handlerRemoved()

表示给这个连接添加的所有的ChannelHandler都被移除了。

(2)ChannelHandler回调方法的应用场景

一.handlerAdded()方法与handlerRemoved()方法通常可用于一些资源的申请和释放。

二.channelActive()方法与channelInactive()方法表示的是TCP连接的建立与释放,可用于统计单机连接数或IP过滤。

三.channelRead()方法可用于根据自定义协议进行拆包。每次读到一定数据就累加到一个容器里,然后看看能否拆出完整的包。

四.channelReadComplete()方法可用于实现批量刷新。如果每次向客户端写数据都通过writeAndFlush()方法写数据并刷新到底层,其实并不高效。所以可以把调用writeAndFlush()方法的地方换成调用write()方法,然后再在channelReadComplete()方法里调用ctx.channel().flush()。

5.ChannelPipeline的事件处理

(1)消息读取和发送被Pipeline处理的过程

(2)ChannelPipeline的主要特征

(1)消息读取和发送被Pipeline处理的过程

消息的读取和发送被ChannelPipeline的ChannelHandler链拦截和处理的全过程:

一.首先AbstractNioChannel内部类NioUnsafe的read()方法读取ByteBuf时会触发ChannelRead事件,也就是由NioEventLoop线程调用ChannelPipeline的fireChannelRead()方法将ByteBuf消息传输到ChannelPipeline中。

二.然后ByteBuf消息会依次被HeadContext、xxxChannelHandler、...、TailContext拦截处理。在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递。

三.接着用户可能会调用ChannelHandlerContext的write()方法发送ByteBuf消息。此时ByteBuf消息会从TailContext开始,途径xxxChannelHandler、...、HeadContext,最终被添加到消息发送缓冲区中等待刷新和发送。在这个过程中,任何ChannelHandler都可以中断当前的流程,中断消息的传递。

(2)ChannelPipeline的主要特征

一.ChannelPipeline支持运行时动态地添加或者删除ChannelHandler

例如业务高峰时对系统做拥塞保护。处于业务高峰期时,则动态地向当前的ChannelPipeline添加ChannelHandler。高峰期过后,再移除ChannelHandler。

二.ChannelPipeline是线程安全的

多个业务线程可以并发操作ChannelPipeline,因为使用了synchronized关键字。但ChannelHandler却不一定是线程安全的,这由用户保证。

6.关于ChannelPipeline的问题整理

一.Netty是如何判断ChannelHandler类型的?

即如何判断一个ChannelHandler是Inbound类型还是Outbound类型?

答:当调用Pipeline去添加一个ChannelHandler结点时,旧版Netty会使用instanceof关键字来判断该结点是Inbound类型还是Outbound类型,并分别用一个布尔类型的变量来进行标识。新版Netty则使用一个整形的executionMask来具体区分详细的Inbound事件和Outbound事件。这个executionMask对应一个16位的二进制数,是哪一种事件就对应哪一个Mask。

//Inbound事件的Mask
MASK_EXEPTION_CAUGHT = 1;
MASK_CHANNEL_REGISTER = 1 << 1;
MASK_CHANNEL_UNREGISTER = 1 << 2;
MASK_CHANNEL_ACTIVE = 1 << 3;
MASK_CHANNEL_INACTIVE = 1 << 4;
MASK_CHANNEL_READ = 1 << 5;
MASK_CHANNEL_READ_COMPLETE = 1 << 6;
MASK_CHANNEL_USER_EVENT_TRIGGERED = 1 << 7;
MASK_CHANNEL_WRITABLITY_CHANGED = 1 << 8;

//Outbound事件的Mask
MASK_BIND = 1 << 9;
MASK_CONNECT = 1 << 10;
MASK_DISCONNECT = 1 << 11;
MASK_CLOSE = 1 << 12;
MASK_DEREGISTER = 1 << 13;
MASK_READ = 1 << 14;
MASK_WRITE = 1 << 15;
MASK_FLUSH = 1 << 16;

二.添加ChannelHandler时应遵循什么样的顺序?

答:Inbound类型的事件传播跟添加ChannelHandler的顺序一样,Outbound类型的事件传播跟添加ChannelHandler的顺序相反。

三.用户手动触发事件传播的两种方式有什么区别?

这两种方式是分别是:ctx.writeAndFlush()和ctx.channel().writeAndFlush()。

答:当通过Channel去触发一个事件时,那么该事件会沿整个ChannelPipeline传播。如果是Inbound类型事件,则从HeadContext结点开始向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从TailContext结点开始向前传播到第一个Outbound类型的结点。当通过当前结点去触发一个事件时,那么该事件只会从当前结点开始传播。如果是Inbound类型事件,则从当前结点开始一直向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从当前结点开始一直向前传播到第一个Outbound类型的结点。

7.ChannelPipeline主要包括三部分内容

一.ChannelPipeline的初始化

服务端Channel和客户端Channel在何时初始化ChannelPipeline?在初始化时又做了什么事情?

二.添加和删除ChannelHandler

Netty是如何实现业务逻辑处理器动态编织的?

三.事件和异常的传播

读写事件和异常在ChannelPipeline中的传播。

8.ChannelPipeline的初始化

(1)ChannelPipeline的初始化时机

(2)ChannelPipeline的初始化内容

(3)ChannelPipeline的说明

(1)ChannelPipeline的初始化时机

在服务端启动和客户端连接接入的过程中,在创建NioServerSocketChannel和NioSocketChannel时,会逐层执行父类的构造方法,最后执行到AbstractChannel的构造方法。AbstractChannel的构造方法会将Netty的核心组件创建出来。而核心组件中就包含了DefaultChannelPipeline类型的ChannelPipeline组件。

//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { 
    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    ...
    //Creates a new instance.
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    
    //Returns a new DefaultChannelPipeline instance.
    protected DefaultChannelPipeline newChannelPipeline() {
        //创建ChannelPipeline组件
        return new DefaultChannelPipeline(this);
    }
    ...
}

//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;//保存了Channel的引用
    ...
    protected DefaultChannelPipeline(Channel channel) {
        //保存Channel的引用到Pipeline组件的成员变量
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        ...
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
    ...
}

(2)ChannelPipeline的初始化内容

ChannelPipeline的初始化主要涉及三部分内容:

一.Pipeline在创建Channel时被创建

二.Pipeline的结点是ChannelHandlerContext

三.Pipeline两大哨兵HeadContext和TailContext

(3)ChannelPipeline的说明

ChannelPipeline中保存了Channel的引用,ChannelPipeline中每个结点都是一个ChannelHandlerContext对象,每个ChannelHandlerContext结点都包裹着一个ChannelHandler执行器,每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline。由于ChannelPipeline又保存了Channel的引用,所以每个ChannelHandlerContext结点都可以拿到所有的上下文信息。

ChannelHandlerContext接口多继承自AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker。

ChannelHandlerContext的关键方法有:channel()、executor()、handler()、pipeline()、alloc()。ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能。

ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表。HeadContext结点会比TailContext结点多一个unsafe成员变量。

public class DefaultChannelPipeline implements ChannelPipeline {
    //ChannelPipeline中每个结点都是一个ChannelHandlerContext对象
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;//ChannelPipeline中保存了Channel的引用
    ...
    protected DefaultChannelPipeline(Channel channel) {
        //保存Channel的引用到Pipeline组件的成员变量
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        ...
        //ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
    
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        //HeadContext结点会比TailContext结点多一个unsafe成员变量
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
    }
    
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }
        ...
    }
    ...
}

//ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { 
    //每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline
    private final DefaultChannelPipeline pipeline;
    ...
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
}

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    //Return the Channel which is bound to the ChannelHandlerContext.
    Channel channel();
    
    //Returns the EventExecutor which is used to execute an arbitrary task.
    EventExecutor executor();

    //The unique name of the ChannelHandlerContext.
    //The name was used when then ChannelHandler was added to the ChannelPipeline. 
    //This name can also be used to access the registered ChannelHandler from the ChannelPipeline.
    String name();

    //The ChannelHandler that is bound this ChannelHandlerContext.
    ChannelHandler handler();

    //Return true if the ChannelHandler which belongs to this context was removed from the ChannelPipeline. 
    //Note that this method is only meant to be called from with in the EventLoop.
    boolean isRemoved();

    ChannelHandlerContext fireChannelRegistered();
    ChannelHandlerContext fireChannelUnregistered();
    ChannelHandlerContext fireChannelActive();
    ChannelHandlerContext fireChannelInactive();
    ChannelHandlerContext fireExceptionCaught(Throwable cause);
    ChannelHandlerContext fireUserEventTriggered(Object evt);
    ChannelHandlerContext fireChannelRead(Object msg);
    ChannelHandlerContext fireChannelReadComplete();
    ChannelHandlerContext fireChannelWritabilityChanged();
    ChannelHandlerContext read();
    ChannelHandlerContext flush();
    //Return the assigned ChannelPipeline
    ChannelPipeline pipeline();
    //Return the assigned ByteBufAllocator which will be used to allocate ByteBufs.
    ByteBufAllocator alloc();
    ...
}

9.ChannelPipeline添加ChannelHandler

(1)常见的客户端代码

(2)ChannelPipeline添加ChannelHandler入口

(3)DefaultChannelPipeline的addLast()方法

(4)检查是否重复添加ChannelHandler结点

(5)创建ChannelHandlerContext结点

(6)添加ChannelHandlerContext结点

(7)回调handerAdded()方法

(8)ChannelPipeline添加ChannelHandler总结

(1)常见的客户端代码

首先用一个拆包器Spliter对二进制数据流进行拆包,然后解码器Decoder会将拆出来的包进行解码,接着业务处理器BusinessHandler会处理解码出来的Java对象,最后编码器Encoder会将业务处理完的结果编码成二进制数据进行输出。

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();   
        p.addLast(newSpliter());
        p.addLast(new Decoder());
        p.addLast(new BusinessHandler());
        p.addLast(new Encoder());          
    }
});

整个ChannelPipeline的结构如下所示:

图片

这里共有两种不同类型的结点,结点之间通过双向链表连接。一种是ChannelInboundHandler,用来处理Inbound事件,比如读取数据流进行加工处理。一种是ChannelOutboundHandler,用来处理Outbound事件,比如当调用writeAndFlush()方法时就会经过这种类型的Handler。

(2)ChannelPipeline添加ChannelHandler入口

当服务端Channel的Reactor线程轮询到新连接接入的事件时,就会调用AbstractNioChannel的内部类NioUnsafe的read()方法,也就是调用AbstractNioMessageChannel的内部类NioMessageUnsafe的read()方法。

然后会触发执行代码pipeline.fireChannelRead()传播ChannelRead事件,从而最终触发调用ServerBootstrapAcceptor接入器的channelRead()方法。

在ServerBootstrapAcceptor的channelRead()方法中,便会通过执行代码channel.pipeline().addLast()添加ChannelHandler,也就是通过调用DefaultChannelPipeline的addLast()方法添加ChannelHandler。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    private boolean needsToSelectAgain;
    private int cancelledKeys;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            needsToSelectAgain = false;
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //selectedKeys.flip()会返回一个数组
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            //1.首先取出IO事件
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            selectedKeys[i] = null;//Help GC
            //2.然后获取对应的Channel和处理该Channel
            //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                //网络事件的处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            //3.最后判断是否应该再进行一次轮询
            if (needsToSelectAgain) {
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }
                selectAgain();
                //selectedKeys.flip()会返回一个数组
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }
    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            int readyOps = k.readyOps();
            ...
            //boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入
            //此时将调用Channel的unsafe变量来进行实际操作
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //调用AbstractNioMessageChannel的NioMessageUnsafe.read()方法
                //进行新连接接入处理
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    ...
}

//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    ...
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
        //临时存放读到的连接NioSocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();
        @Override
        public void read() {
            //断言确保该read()方法必须来自Reactor线程调用
            assert eventLoop().inEventLoop();
            //获得Channel对应的Pipeline
            final ChannelPipeline pipeline = pipeline();
            //获得Channel对应的RecvByteBufAllocator.Handle
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            do {
                //1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel
                //通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
            } while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接
          
            //2.设置并绑定NioSocketChannel
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                //调用DefaultChannelPipeline的fireChannelRead()方法
                pipeline.fireChannelRead(readBuf.get(i));
            }
            //3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法
            readBuf.clear();
            pipeline.fireChannelReadComplete();
        }
    }
    ...
}

//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...
    protected DefaultChannelPipeline(Channel channel) {
        ...
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
    
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        //从Pipeline的第一个HeadContext处理器开始调用
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        ...
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //调用AbstractChannelHandlerContext的fireChannelRead()方法
            ctx.fireChannelRead(msg);
        }
     
        @Override
        public ChannelHandler handler() {
            return this;
        }
        ...
    }
    ...
}

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    ...
    //初始化服务端Channel时,会向其Pipeline添加ServerBootstrapAcceptor处理器
    @Override
    void init(Channel channel) throws Exception {
        //1.设置服务端Channel的Option与Attr
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
       
        //2.设置客户端Channel的Option与Attr
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
       
        //3.配置服务端启动逻辑
        ChannelPipeline p = channel.pipeline();
        //p.addLast()用于定义服务端启动过程中需要执行哪些逻辑
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                //一.添加用户自定义的Handler,注意这是handler,而不是childHandler
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) pipeline.addLast(handler);
                //二.添加一个特殊的Handler用于接收新连接
                //自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        //调用DefaultChannelPipeline的addLast()方法
                        pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, 
                            currentChildHandler, 
                            currentChildOptions, 
                            currentChildAttrs)
                        );
                    }
                });
            }
        });
    }

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        ...
        //channelRead()方法在新连接接入时被调用
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            //1.给新连接的Channel添加用户自定义的Handler处理器
            //这里的childHandler其实是一个特殊的Handler: ChannelInitializer
            child.pipeline().addLast(childHandler);
            //2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关
            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            }
            //3.设置新连接Channel的属性
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            //4.绑定Reactor线程
            //childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法
            childGroup.register(child);
        }
        ...
    }
    ...
}

(3)DefaultChannelPipeline的addLast()方法

使用synchronized关键字是为了防止多线程并发操作ChannelPipeline底层的双向链表,添加ChannelHandler结点的过程主要分为4个步骤:

步骤一:判断ChannelHandler是否重复添加

步骤二:创建结点

步骤三:添加结点到链表

步骤四:回调添加完成事件

这个结点便是ChannelHandlerContext,Pipeline里每个结点都是一个ChannelHandlerContext。addLast()方法便是把ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表。

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) throw new NullPointerException("handlers");
        for (ChannelHandler h: handlers) {
            if (h == null) break;
            addLast(executor, null, h);
        }
        return this;
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1.检查是否有重复的ChannelHandler结点
            checkMultiplicity(handler);
            //2.创建ChannelHandlerContext结点
            newCtx = newContext(group, filterName(name, handler), handler);
            //3.添加ChannelHandlerContext结点
            addLast0(newCtx);
            ...
        }
        //4.回调用户方法
        //通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了
        callHandlerAdded0(newCtx);
        return this;
    }
    ...
}

(4)检查是否重复添加ChannelHandler结点

Netty使用了一个成员变量added来表示一个ChannelHandler是否已经添加。如果当前要添加的ChannelHandler是非共享的并且已经添加过,那么抛出异常,否则标识该ChannelHandler已添加。

如果一个ChannelHandler支持共享,那么它就可以无限次被添加到ChannelPipeline中。如果要让一个ChannelHandler支持共享,只需要加一个@Sharable注解即可。而ChannelHandlerAdapter的isSharable()方法正是通过判断该ChannelHandler对应的类是否标有@Sharable注解来实现的。

Netty为了性能优化,还使用了ThreadLocal来缓存ChannelHandler是否共享的情况。在高并发海量连接下,每次有新连接添加ChannelHandler都会调用isSharable()方法,从而优化性能。

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    ...
}

//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {
    //Not using volatile because it's used only for a sanity check.
    boolean added;

    //Return true if the implementation is Sharable and so can be added to different ChannelPipelines.
    public boolean isSharable() {
        //Cache the result of Sharable annotation detection to workaround a condition. 
        //We use a ThreadLocal and WeakHashMap to eliminate the volatile write/reads. 
        //Using different WeakHashMap instances per Thread is good enough for us and the number of Threads are quite limited anyway.
        Class<?> clazz = getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }
    ...
}

(5)创建ChannelHandlerContext结点

根据ChannelHandler创建ChannelHandlerContext类型的结点时,会将该ChannelHandler的引用保存到结点的成员变量中。

public class DefaultChannelPipeline implements ChannelPipeline {
    ...    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1.检查是否有重复的ChannelHandler结点
            checkMultiplicity(handler);
            //2.创建ChannelHandlerContext结点
            newCtx = newContext(group, filterName(name, handler), handler);
            //3.添加ChannelHandlerContext结点
            addLast0(newCtx);
            ...
        }
        //4.回调用户方法
        //通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了
        callHandlerAdded0(newCtx);
        return this;
    }
    
    //给ChannelHandler创建一个唯一性的名字
    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
    
    //根据ChannelHandler创建一个ChannelHandlerContext结点
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    ...
}

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;
    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    ...
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    ...
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
    ...
}

(6)添加ChannelHandlerContext结点

使用尾插法向双向链表添加结点。

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    ...
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    ...
}

(7)回调handerAdded()方法

向ChannelPipeline添加完新结点后,会使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成,然后执行ctx.handler().handlerAdded(ctx),回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        //使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成
        ctx.setAddComplete();
        //回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法
        ctx.handler().handlerAdded(ctx);
    }
    ...
}

//DemoHandler是用户定义的ChannelHandler
public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //这个DemoHandler结点被添加到ChannelPipeline之后,就会回调这里的方法
    }
    ...
}

最典型的一个回调就是用户代码的ChannelInitializer被添加完成后,会先调用其initChannel()方法将用户自定义的ChannelHandler添加到ChannelPipeline,然后再调用pipeline.remove()方法将自身结点进行删除。

public class NettyServer {
    private int port;
    public NettyServer(int port) {
        this.port = port;
    }
  
    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            //设置一个ChannelInitializer类型的childHandler
            //新连接接入时,会执行ServerBootstrapAcceptor.channelRead()中的代码"child.pipeline().addLast(childHandler)"
            //也就是会把这个ChannelInitializer类型的结点会被添加到新连接Channel的Pipeline中
            //添加完这个结点后会回调ChannelInitializer的handlerAdded()方法
            //其中会调用ChannelInitializer的initChannel()方法给Pipeline添加真正的结点
            //执行完initChannel()方法后,就会移除ChannelInitializer这个结点
            .childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    .addLast(new NettyServerHandler());//针对网络请求的处理逻辑
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口
            channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception{
        System.out.println("Starting Netty Server...");
        int port = 8998;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new NettyServer(port).start();
    }
}

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    ...
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

    @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

    private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                //ChannelPipeline删除ChannelHandler结点(ChannelInitializer)
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }
    ...
}

(8)ChannelPipeline添加ChannelHandler总结

一.判断ChannelHandler是否重复添加的依据是:如果该ChannelHandler不是共享的且已被添加过,则拒绝添加。

二.否则就创建一个ChannelHandlerContext结点(ctx),并把这个ChannelHandler包装进去,也就是保存ChannelHandler的引用到ChannelHandlerContext的成员变量中。由于创建ctx时保存了ChannelHandler的引用、ChannelPipeline的引用到成员变量,ChannelPipeline又保存了Channel的引用,所以每个ctx都拥有一个Channel的所有信息。

三.接着通过双向链表的尾插法,将这个ChannelHandlerContext结点添加到ChannelPipeline中。

四.最后回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。


http://www.kler.cn/a/599253.html

相关文章:

  • 2000-2019年各省地方财政耕地占用税数据
  • Tailwind CSS 学习笔记(四)
  • 免费试用优化指南:提升转化率的关键策略
  • STM32:关于NVIC的工作与优先级分组方式
  • std::endl为什么C++ 智能提示是函数?
  • Python----计算机视觉处理(Opencv:图像亮度变换)
  • 【HTML5】02-列表 + 表格 + 表单
  • C语言动态顺序表的实现
  • 日常学习开发记录-select组件(1)
  • 【Linux】同步原理剖析及模拟BlockQueue生产消费模型
  • 数据结构--红黑树
  • SpringBoot星之语明星周边产品销售网站设计与实现
  • 23种设计模式-组合(Composite)设计模式
  • 第十六届蓝桥杯康复训练--6
  • 【C++】类和对象(匿名对象)
  • 【Unity】批处理和实例化的底层优化原理(未完)
  • 图论 | 98. 所有可达路径
  • C++效率掌握之STL库:stack queue函数全解
  • vue java 实现大地图切片上传
  • 分页查询互动问题(用户端)