当前位置: 首页 > article >正文

Netty入门

文章目录

  • Netty入门
    • 快速入门
    • 组件
      • EventLoop
      • Channel
      • Future & Promise
      • Handler & Pipeline
      • ByteBuf
        • 创建
        • 组成
        • 扩容
        • 读写操作
        • 回收
        • slice
        • CompositeByteBuf
        • Unpooled
    • 双向通信

Netty入门

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

快速入门

1)添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

2)服务端代码

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {

                @Override
                protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                    System.out.println(s);
                }
            });
        }
    }).bind(8080);

3)客户端代码

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("localhost",8080)
    .sync()
    .channel()
    .writeAndFlush(new Date() + "hello world");

组件

EventLoop

EventLoop(事件循环对象) 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

EventLoopGroup(事件循环组) 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

以一个简单的实现为例:

// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());

输出

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98

演示 NioEventLoop 处理 io 事件:

DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
    // 第一个nio事件组只负责accept操作
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch)  {
            // 第二个nio事件组处理
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            // 用默认事件组来处理读操作
            ch.pipeline().addLast(normalWorkers,"myhandler",
                                  new ChannelInboundHandlerAdapter() {
                                      @Override
                                      public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                          ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                                          if (byteBuf != null) {
                                              byte[] buf = new byte[16];
                                              ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                                              log.debug(new String(buf));
                                          }
                                      }
                                  });
        }
    }).bind(8080).sync();

NioEventLoop 处理普通任务:

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.execute(()->{
    log.debug("normal task...");
});

NioEventLoop 处理定时任务:

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);

Channel

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080); // 1

channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");

在客服端代码中,connect返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象

connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能立刻获得到正确的 Channel 对象,而需要通过sync()来等待连接。

除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel());
channelFuture.addListener((ChannelFutureListener) future -> {
    System.out.println(future.channel());
});

使用CloseFuture优雅地关闭事件组:

NioEventLoopGroup group = new NioEventLoopGroup();
// 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        log.debug("处理关闭之后的操作");
        group.shutdownGracefully();
    }
});

Future & Promise

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

同步处理:

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("set success, {}",10);
    promise.setSuccess(10);
});

log.debug("start...");
log.debug("{}",promise.getNow()); // 还没有结果
log.debug("{}",promise.get());

异步处理:

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

// 设置回调,异步接收结果
promise.addListener(future -> {
    // 这里的 future 就是上面的 promise
    log.debug("{}",future.getNow());
});

// 等待 1000 后设置成功结果
eventExecutors.execute(()->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("set success, {}",10);
    promise.setSuccess(10);
});

log.debug("start...");

处理失败抛异常:

DefaultEventLoop eventExecutors = new DefaultEventLoop();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

        eventExecutors.execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException e = new RuntimeException("error...");
            log.debug("set failure, {}", e.toString());
            promise.setFailure(e);
        });

        log.debug("start...");
        log.debug("{}", promise.getNow());
        promise.get(); // sync() 也会出现异常,只是 get 会再用 ExecutionException 包一层异常

处理失败不抛异常:

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    RuntimeException e = new RuntimeException("error...");
    log.debug("set failure, {}", e.toString());
    promise.setFailure(e);
});

log.debug("start...");
log.debug("{}", promise.getNow());
promise.await(); // 与 sync 和 get 区别在于,不会抛异常
log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());

异步处理失败:

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

promise.addListener(future -> {
    log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
});

eventExecutors.execute(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    RuntimeException e = new RuntimeException("error...");
    log.debug("set failure, {}", e.toString());
    promise.setFailure(e);
});

log.debug("start...");

Handler & Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(1);
                    ctx.fireChannelRead(msg); // 1
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(2);
                    ctx.fireChannelRead(msg); // 2
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(3);
                    ctx.channel().write(msg); // 3
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(4);
                    ctx.write(msg, promise); // 4
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(5);
                    ctx.write(msg, promise); // 5
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, 
                                  ChannelPromise promise) {
                    System.out.println(6);
                    ctx.write(msg, promise); // 6
                }
            });
        }
    })
    .bind(8080);

ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。

ctx.fireChannelRead(msg) 是 调用下一个入站处理器

ctx.write(msg, promise) 的调用会触发上一个出站处理器

ByteBuf

创建
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
log(buffer);

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
组成

ByteBuf 由四部分组成

请添加图片描述

最开始读写指针都在 0 位置

扩容

ByteBuf会自动扩容,扩容规则是:

  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 210=1024(29=512 已经不够了)
  • 扩容不能超过 max capacity 会报错
读写操作

写入字节:

buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);

读取字节

System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);

要重复读的话可以在 read 前先做个标记 mark

buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
buffer.resetReaderIndex();
log(buffer);
回收

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

回收基本规则是:谁是最后使用者,谁负责 release

slice

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

请添加图片描述

CompositeByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝

有两个 ByteBuf

CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buf3.addComponents(true, buf1, buf2);
Unpooled

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作

这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

双向通信

服务端:

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf byteBuf = (ByteBuf) msg;
                    try {
                        System.out.println(byteBuf.toString(Charset.defaultCharset()));
                        ByteBuf response = ctx.alloc().buffer();
                        response.writeBytes(byteBuf);
                        ctx.writeAndFlush(response);
                    } finally {
                        byteBuf.release();
                    }
                }
            });
        }
    })
    .bind(8080);
}

客户端:

NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new StringEncoder());
            sc.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf byteBuf = (ByteBuf) msg;
                    System.out.println(byteBuf.toString(Charset.defaultCharset()));
                    byteBuf.release();
                }
            });
        }
    })
    .connect("localhost", 8080)
    .sync()
    .channel();
channel.closeFuture().addListener(future -> {
    group.shutdownGracefully();
});

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while(true){
        String s = scanner.nextLine();
        if("q".equals(s)){
            channel.close();
            break;
        }
        channel.writeAndFlush(s);
    }
}).start();

http://www.kler.cn/news/323733.html

相关文章:

  • 机器学习(4):机器学习项目步骤(一)——定义问题
  • Pytorch实现Transformer
  • 激光slam学习笔记4--slam_in_autonomous_driving编译碰到问题汇总
  • 基于Python+flask+MySQL+HTML的全国范围水质分析预测系统,可视化用echarts,预测算法随机森林
  • [Redis][Zset]详细讲解
  • FastAPI前置知识及快速入门
  • Python入门:类的异步资源管理与回收( __del__ 方法中如何调用异步函数)
  • CMake构建学习笔记18-cpp-httplib库的构建
  • 【目标检测】隐翅虫数据集386张VOC+YOLO
  • 【web阅读记录】web相关概念及知识整理
  • 【机器学习】——支持向量机
  • uni-app 封装websocket 心跳检测,开箱即用
  • SCAU学习笔记 - 面向对象程序设计课后习题
  • GAMES101(20节,动画和仿真)
  • 如何提升JavaScript安全性,保护应用程序免受威胁
  • Ubuntu 离线安装 docker
  • 深度对比:etcd、Consul、Zookeeper 和 Nacos 作为注册中心和配置中心的优势与劣势
  • 前端请求音频返回pcm流进行播放
  • 大数据毕业设计选题推荐-豆瓣电子图书推荐系统-数据分析-Hive-Hadoop-Spark
  • 【Anti-UAV410】论文阅读
  • Miniforge详细安装教程(macOs和Windows)
  • 尚品汇-自动化部署-Jenkins的安装与环境配置(五十六)
  • SpringBoot gateway如何支持跨域?
  • Spring的IOC和DI入门案例分析和实现
  • AWS注册时常见错误处理
  • RabbitMQ——消息的可靠性处理
  • Docker-Compose:简化Docker容器编排的利器
  • [vulnhub] Prime 1
  • 从哪里下载高清解压视频素材?推荐五个优质素材资源网站
  • RtspServer:轻量级RTSP服务器和推流器