当前位置: 首页 > article >正文

Netty中的核心概念

事件传播机制

当pipeline中有多个handler时,netty内部的事件是如何向后传递到每个handler中的?

结论:需要在当前handler中手动将当前事件传递下去

1,如果handler是直接实现的接口,使用ChannelHandlerContext的fireXXX方法

2,如果handler是继承Adapter,直接调用父类即可,比如:super.channelRead(ctx, msg);

super.channelRead(ctx, msg);的本质也是调用了Context中的fireActive方法

    @Skip
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

ChannelOutboundHandler 可以用来做什么?

1,响应数据包装,压缩,加密,…等通用功能

ChannelHandler 的执行顺序

BootStrap

1,Bootstrap是引导的意思,它的作用是配置网络参数并引导Netty程序启动,将各个组件都串起来

2,Netty中提供了2种类型的引导类,一种用于客户端(Bootstrap),而另一种(ServerBootstrap)用于服务器,它们之间的区别在于:

  • ServerBootstrap 需要指定绑定的端口,服务器开启监听接收连接,而 Bootstrap 则是想要连接到远程节点故需要对端的地址信息

  • 引导客户端只需要一个EventLoopGroup,服务端一般需要两个

Channel

Netty中的Channel是与网络套接字相关的,底层对应了一个socket连接,它负责基本的IO操作,比如:

  • read(),write() 等,它的一些核心作用有:

  • 获得当前网络连接的通道状态

  • 获得网络连接的配置参数(如缓冲区大小等)

  • 网络I/O操作

EventLoop和EventLoopGroup

1,Netty是基于事件驱动的,比如:连接就绪;数据读取;异常事件等,有了事件,就需要一个组件去监控事件的产生和事件的协调处理,这个组件就是EventLoop(事件循环)

2,Netty 中每个Channel 都会被分配到一个 EventLoop,一个 EventLoop 可以服务于多个 Channel的事件处理

3,每个 EventLoop 会占用一个 Thread,在这个Thread上处理相应的事件

4,EventLoopGroup 包含了一组EventLoop,可以理解成Netty的线程池

EventLoopGroup线程数有多大

// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup();
// IO线程,处理注册其上Channel的I/O事件及其他Task
EventLoopGroup worker = new NioEventLoopGroup();

默认为当前CPU核心数*2

对于boss group,我们其实也只用到了其中的一个线程,因为服务端一般只会绑定一个端口启动

小结

Bootstrap的作用及类型

1,配置网络参数并引导netty程序启动,

2,ServerBootstrap作用于服务端,Bootstrap作用于客户端

Channel的作用

1,底层绑定的是一个网络连接socket,故可以获取网络通道状态

2,获取相关网络配置参数

3,处理相关操作,如IO

EventLoop和EventLoopGroup 的作用

1,EventLoop 检测并协调处理所注册Channel的相关事件,

2,EventLoop 绑定在一个线程上

3,EventLoopGroup 包含了一组EventLoop,可以理解成Netty的线程池

ChannelPipeline

1,ChannelHandler 是Netty的事件处理器,在其对应的事件回调方法中处理事件,是开发者介入的入口

2,ChannelHandlerContext 持有ChannelHandler并构成双向链表,handler基于它连接pipeline和channel

3,ChannelPipeline 提供了一个了容器,持有ChannelHandlerContext 构成的链表,并连接channel

在刚开始的时候,ChannelPipeline只有head和tail,当接收到了客户端的请求之后,准备初始化ChannelPipeline的时候会回调Handler方法,把Handler方法全都加入在管道当中。

channel中存放着这个套接字的各种信息。

ChannelHandler如何复用?

添加@Sharable注解

SimpleChannelInboundHandler

对于编写Netty数据入站处理器,可以选择继承ChannelInboundHandlerAdapter,也可以选择继承SimpleChannelInboundHandler,区别是什么?

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;

        try {
            if (this.acceptInboundMessage(msg)) {
                this.channelRead0(ctx, msg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (this.autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }

        }

    }

继承SimpleChannelInboundHandler需要重写channelRead0方法,且可以通过泛型指定msg类型

SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuf资源

ByteBuf的内存模型

1,堆缓冲区(HeapByteBuf):内存分配在jvm堆,分配和回收速度比较快,可以被JVM自动回收,缺点是,如果进行socket的IO读写,需要额外做一次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有一定程度的下降。由于在堆上被 JVM 管理,在不被使用时可以快速释放。

2,直接缓冲区(DirectByteBuf):内存分配的是堆外内存(系统内存),相比堆内存,它的分配和回收速度会慢一些,但是将它的数据写入或从Socket Channel中读取数据到它时,由于减少了一次内存拷贝,速度比堆内存块(netty内部默认使用)

3,复合缓冲区(CompositeByteBuf):顾名思义就是将两个不同的缓冲区从逻辑上合并,让使用更加方便。

ByteBuf的分配

Netty提供了ByteBufAllocator接口,有两个重要实现:PooledByteBufAllocator 和 UnpooledByteBufAllocator

1,PooledByteBufAllocator:实现了 ByteBuf 内存的池化,提高性能减少并最大限度地减少内存碎片,池化思想通过预先申请一块专用内存地址作为内存池进行管理,从而不需要每次都进行分配和释放(netty内部默认使用该方式,Android除外)

2,UnpooledByteBufAllocator:没有实现内存的池化,每次都会为ByteBuf对象重新分配内存

ByteBuf资源释放

1,unpool模式下,ByteBuf如果采用的是堆缓冲区可以由GC回收,但是如果采用的是直接缓冲区,就不受GC的管理,就得手动释放否则会发生内存泄露

2,pool模式下,某个ByteBuf对象关联的内存何时可以被归还给pool

引用计数法:ByteBuf对象的引用计数为0时,它关联的内存要么直接释放要么归还到池,Netty提供了ReferenceCounted接口

ReferenceCountUtil.release(buf)

什么时候释放?

1,数据入站能自动释放的地方:TailContext(前提是事件能传递到这),继承SimpleChannelInboundHandler

2,数据出站能自动释放的地方:HeadContext 出站消息一般是由应用所申请,到达最后一站时,经过一轮复杂的调用,在flush完成后终将被release掉

1,对于入站消息:

  • 随着事件向后传递,依然将原Bytebuf对象向后传,如果能到TailContext则会自动释放该ByteBuf

  • 事件传递的过程中,向后传递的是新的ByteBuf对象,则有必要手动释放原ByteBuf

  • 在可以的情况下选择继承SimpleChannelInboundHandler是个不错的选择

2,对于出站消息:

  • 应用的ByteBuf对象能走到HeadContext,则在flush后会释放

  • 事件传递的过程中,向后传递的是新的ByteBuf对象,则有必要手动释放原ByteBuf

通过Wrap操作可以快速转换或得到一个ByteBuf对象,Unpooled 工具类中提供了很多重载的wrappedBuffer方法

Netty Future

Netty 自己实现的 Future 继承了 JDK 的 Future,新增了 Listener 机制,任务结束会回调Listener

    public void testNettyFuture2() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup(1);

        Future<String> future = group.submit( () ->{
            log.info("---异步线程执行任务开始----");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("---异步线程执行任务结束----");
            return "hello netty future";
        });

        //基于future 添加监听
        /*future.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                future.get();
            }
        });*/
        future.addListener( future1 -> {
            log.info("---收到异步线程执行任务结果通知----执行结果是;{}",future1.get());
        });

        log.info("---主线程向后执行----");
        TimeUnit.SECONDS.sleep(10);
    }

Netty Promise

1,Netty的Future基于JDK Future增加了Listener机制,但是Listener的触发则是需要等到异步任务执行结束,而任务大都是一个Runnable或者Callable,也就是需要等到其run()方法或者call()方法返回后才会触发Listener的执行,整个过程无法人为干预。

2,Netty的 Promise接口扩展了Netty的Future接口,它可以设置异步执行的结果并触发Listener,比如在IO操作过程,不管是顺利完成、还是发生异常,都可以设置Promise的结果,并且通知Promise的

public void testNettyPromise() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        Promise promise = new DefaultPromise(group.next());// promise 绑定到 eventLoop上
        
        group.submit(()->{
            log.info("---异步线程执行任务开始----");
            try {
                // int i = 1/0;
                TimeUnit.SECONDS.sleep(3);
                log.info("first");
                promise.setSuccess("hello netty promise");

                TimeUnit.SECONDS.sleep(3);
                log.info("two");
                log.info("---异步线程执行任务结束----");
                log.info("finish");

            } catch (Throwable e) {
                log.info("exception ={}",e.getMessage());
                promise.setFailure(e);
            }
        });

        promise.addListener( future -> {
            if (future.isSuccess()) {
                log.info("----异步任务执行结果:{}",future.get());
            }else {
                log.info("异步任务执行失败,exception={}",future.cause().getMessage());
            }
        });

        log.info("---主线程----");
        TimeUnit.SECONDS.sleep(10);

    }


http://www.kler.cn/a/273166.html

相关文章:

  • 【CSS】——基础入门常见操作
  • 云轴科技ZStack在CID大会上分享VF网卡热迁移技术
  • C++面向对象设计模式——单例模式
  • (七)Python运算符和优先级
  • LeetCode:633. 平方数之和(Java)
  • 二、应用层,《计算机网络(自顶向下方法 第7版,James F.Kurose,Keith W.Ross)》
  • python中的闭包
  • 使用 ONLYOFFICE API 构建 Java 转换器,在 Word 和 PDF 之间进行转换
  • 本地mysql测试成功后上传至云服务器出现了这么多问题?
  • 一.Netedit的简要介绍
  • 修改/etc/resolve.conf重启NetworkManager之后自动还原
  • leetcode刷题(javaScript)——动态规划相关场景题总结
  • 微信小程序 nodejs+vue+uninapp学生在线选课作业管理系统
  • 【概率论中的两种重要公式:全概率和贝叶斯】
  • js判断对象是否有某个属性
  • Android SystemServer进程解析
  • MapReduce面试重点
  • 详解Python中的缩进和选择
  • 搜索二叉树迭代和递归的两种*简单*实现方式
  • python--剑指offer--题目目录-学习计划
  • Spring Bean的生命周期流程
  • ElasticSearch架构设计
  • 中国移动端第三方输入法市场专题2024
  • 掘根宝典之C++迭代器简介
  • C/C++中{}的用法总结(全)
  • 后端工程师快速使用vue和Element