当前位置: 首页 > article >正文

netty中Future和ChannelHandler

netty中的Future,继承自 jdk中的Future,, jdk中的Future,很垃圾,只能同步阻塞获取结果,,,

netty中的Future进行了升级,,可以addListener()异步获取结果,,可以isSuccess()判断任务成功还是失败,,

  • jdk的Future
    • get()
    • isDone()
    • cancel() : 取消当前任务
  public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);
        Future<Integer> future = service.submit(() -> {
            log.debug("running...");
            Thread.sleep(2000);
            return 2;
        });

        Integer i = future.get();
        log.debug("i = " + i);
    }
  • netty中的Future
    • isSuccess() : 判断任务是否成功
    • sync() : 同步等待,,任务不成功会抛错
    • getNow() : 获取结果,没有就返回null
    • await() : 同步等待,,任务不成功不会报错,,后面通过isSuccess()判断是否成功
    • addListener() : 任务结束回调
  public static void main(String[] args) {
        // netty中的线程池  eventLoop,, eventloop中就一个线程
        NioEventLoopGroup group = new NioEventLoopGroup(2);

        EventLoop eventLoop = group.next();

        Future<String> future = eventLoop.submit(() -> {
            Thread.sleep(2000);
            return "hehe";
        });


        String now = future.getNow();
        System.out.println("now = " + now);
        boolean success = future.isSuccess();
        System.out.println("success = " + success);


        future.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                Object now1 = future.getNow();
                System.out.println("now1 = " + now1);
                boolean success = future.isSuccess();
                System.out.println("success = " + success);
            }
        });

    }
  • netty中的Promise
    继承自netty的Future,
    Promise可以设置成功和失败,,不用等到任务结束
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup(2);
        EventLoop eventLoop = group.next();
        // 主动创建promise  ===> 结果的容器,
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            System.out.println("开始计算");
            try {
                int i = 1/0;
                Thread.sleep(1000);
                promise.setSuccess(1000);
            } catch (InterruptedException e) {
               e.printStackTrace();
                promise.setFailure(e);
            }

//

        }).start();


        Integer i = promise.get();
        System.out.println("i = " + i);
    }
ChannelHandler

netty中handler分为两类:

  • ChannelInboundHandler : 入站,, 读取数据,,,channel按照添加顺序依次执行
  • ChannelOutboundHandler :出站 : 发送数据,,channel 逆序执行

channel.wirte() : 从末尾逆序执行
ctx.wirte() : 是从当前的handler,往前面找ChannelOutboundHandler执行

 public static void main(String[] args) {

        new ServerBootstrap()
                .group(new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();

                        // 添加处理器  ,,, netty会自动添加两个handler,,一个叫head,,一个叫tail,,,
                        // 底层是 双向链表
                        pipeline.addLast("handle01",new ChannelInboundHandlerAdapter(){
                            // 入站的handler,,一般关心的 read

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg:{}",msg);


                                ByteBuf byteBuf = (ByteBuf) msg;
                                String s = byteBuf.toString(Charset.defaultCharset());

                                // 调用下一个handle       ctx.fireChannelRead(msg);,,并且将处理完成的结果,传递给下一个handler
                                super.channelRead(ctx, s);
                            }
                        });

                        pipeline.addLast("handle02",new ChannelInboundHandlerAdapter(){
                            // 入站的handler,,一般关心的 read

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg222:{}",msg);

                                User user = new User();
                                user.setName(((String) msg));
                                super.channelRead(ctx, user);
                            }
                        });

                        pipeline.addLast("handle03",new ChannelInboundHandlerAdapter(){
                            // 入站的handler,,一般关心的 read

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg333:{}",msg);
                                super.channelRead(ctx, msg);

                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
                            }
                        });



                        // 出站是,,从后面往前走  ,,只有有写出的时候,才会触发出站方法,,,,
                        pipeline.addLast("handle04",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("msg444:{}",msg);
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("handle05",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("msg555:{}",msg);
                                super.write(ctx, msg, promise);
                            }
                        });

                    }
                }).bind(new InetSocketAddress(8080));
    }
EmbeddedChannel 模拟channel执行
    public static void main(String[] args) {

        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("msg = " + msg);
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h2 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception {
                System.out.println(4444);
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes()));

        channel.writeOutbound(channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes())));
    }

http://www.kler.cn/a/574370.html

相关文章:

  • 鸿蒙Next网络请求~上传文件pdf
  • SCI1区TOP:自适应学习粒子群算法SLPSO,深度解析+性能实测
  • 23种设计模式之单例模式(Singleton Pattern)【设计模式】
  • 智能文档制度管理系统技术
  • istio从入门到精通(1)
  • vue+neo4j 四大名著知识图谱问答系统
  • es 慢查询引起 cpu报警处理方法
  • 计算机毕业设计Python+Django+Vue3微博数据舆情分析平台 微博用户画像系统 微博舆情可视化(源码+ 文档+PPT+讲解)
  • Java,Golang,Rust 泛型的大体对比小记
  • 验证测试 .NET 10 预览版的 Windows 窗体中的剪贴板新增功能
  • 【1Panel】平替宝塔面板!1Panel面板香橙派部署结合内网穿透远程管理
  • 第5章:vuex
  • C++ Primer 拷贝控制和资源管理
  • 嵌入式 ARM Linux 系统构成(2):Linux内核层
  • 文本处理Bert面试内容整理-如何使用BERT进行微调?
  • FX-枚举
  • Python编程中常见的10个案例
  • Java爬虫获取淘宝商品搜索接口(item_search)的详细解析
  • C++ Primer 拷贝、赋值与销毁
  • Spring WebFlux 入门指南