当前位置: 首页 > article >正文

nio多线程版本

多线程多路复用

多线程NIO,,就是多个线程,每个线程上都有一个Selector,,,比如说一个系统中一个线程用来接收请求,,剩余的线程用来读写数据,,每个线程独立干自己的事,,,
一个线程的多路复用,,虽然不会卡住,,但是执行单个事件的时间过长,也会长时间卡在那里,,,需要开启多个线程,,但是多个线程中执行代码的顺序是不可控的,,一般是在主线程接收到一个新的连接之后,再用子线程中的Selector去关注返回的SocketChannel,
selector.select()是在子线程中执行的,,,关注事件是在主线程执行的,,如果子线程中的selector.select()先阻塞住了,,关注事件的代码就必须等到有新的事件到来,才会往下执行

如果子线程想要监听到这个事件,,注册事件的代码就必须在 selector.select()阻塞的代码的前面,,,但是这个是不可控的,,就需要使用selector.wakeup() 唤醒这个selector.select() 阻塞,,唤醒了之后,后面关注事件的代码就执行了,后面监听到这个关注事件,就会去处理

public class MultiThreadServer {

    public static void main(String[] args) throws IOException {

        Thread.currentThread().setName("boss");

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);

        ssc.bind(new InetSocketAddress(8080));


        // 可用的核心数   ===》 如果在docker上,,拿到的是物理cpu个数,,而不是容器申请时的个数,,,在jdk10才修复
        int i1 = Runtime.getRuntime().availableProcessors();

        Worker[] workers = new Worker[2];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-"+i);
        }

        AtomicInteger index =new AtomicInteger(0);

        while (true){
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while (iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    System.out.println("客户端已进入"+sc);
                    sc.configureBlocking(false);


                    System.out.println("before register...");

                    // 轮询选择器,选择一个线程执行
                    workers[index.getAndIncrement() % workers.length].register(sc);

//                    worker01.register(sc);
                    // 前面初始化 selector.select()阻塞了
//                    SelectionKey scKey = sc.register(worker01.selector, 0, null);
//                    scKey.interestOps(SelectionKey.OP_READ);



                    System.out.println("after register...");
//                    sc.register()
                }
            }
        }




    }




   static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;


        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();


        private volatile  boolean start = false;// 是否初始化

        public Worker(String name) {
            this.name = name;
        }
        // 初始化线程和Selector
        public void register(SocketChannel sc) throws IOException {

            if (!start){
                selector = Selector.open();

                thread = new Thread(this,name);
                thread.start();

                start = true;
            }

            queue.add(()->{
                try {
                    sc.register(selector,SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    throw new RuntimeException(e);
                }
            });


            selector.wakeup();

//            sc.register(selector,SelectionKey.OP_READ);
//            // 唤醒阻塞,,, 阻塞住了,,下面的register就无法到达
//            selector.wakeup();
        }

        @Override
        public void run() {

            while (true){
                try {

                    selector.select();

                    // 最开始的时候  ,,,run如果先执行,,,queue里面是空的,,注册逻辑放在select()前面也是一样不会将 socketChannel注册进去,, 需要唤醒
                    Runnable task = queue.poll();
                    if (task != null){
                        task.run();
                    }

                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

                    while (iterator.hasNext()){

                        SelectionKey key = iterator.next();
                        iterator.remove();

                        if (key.isReadable()){
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.read(buffer);
                            System.out.println(Thread.currentThread().getName()+"name");
                            debugAll(buffer);
                        }else if(key.isWritable()){

                        }

                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

            }
        }
    }
}
名词
  • 阻塞IO: 用户线程会一直等待,直到数据准备好,才返回,,,就是线程停止,等待数据

  • 非阻塞IO: 用户程序空间操作系统的内核空间 ==》 操作系统会立即返回,,用户线程始终在运行,并没有停下来

  • 多路复用: 通过Selector监测事件,,检测到有事件就会到操作系统内核空间去执行

  • 同步: 线程自己去获取结果

  • 异步: 线程自己不去获取结果,,由另一个线程送结果 (至少有两个线程)

  • 异步IO: 异步都是非阻塞的,,一个线程执行,,通过另一个线程返回结果,,,

阻塞IO是同步的,,也叫同步阻塞,,,
非阻塞IO也是同步的,也叫同步非阻塞
多路复用也是同步的,,只是将所有的事件都集中到一起处理

零拷贝

在这里插入图片描述

一般的文件读取,都是要经过内核空间用户空间 ,再从用户空间 拷贝到 缓冲区, 一个文件要拷贝很多次,,才能发送出去,,
零拷贝: 就是让文件拷贝,不再经过用户空间,,直接就用操作系统内核空间里面的数据。。
零拷贝适合那种小文件拷贝,,因为大文件会占用很多内核缓冲区,,可能会影响别的IO操作

// 堆外内存(direct buffer) : 直接分配在堆外的内存,减少从堆内存到直接内存的拷贝
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
AIO(Asynchronous IO) 异步输入输出

非阻塞IO,,需要一个回调函数,去传递 执行完成之后返回的结果

public class Demo01 {

    /**
     * linux 对异步IO不友好,,, 底层只是用多路复用模拟了一个异步IO,,,性能上没有优势
     * window系统通过IOCP实现了真正的异步IO
     *
     *
     */

    public static void main(String[] args) throws IOException, InterruptedException {


        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 异步IO必须是多个线程,,,一个线程发起,一个线程送结果
        Path path = Paths.get("/Users/chenjie/code/learn/learn-netty/learn-netty/netty01/src/main/resources/word.txt");
        System.out.println(Files.exists(path));
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(8);
            ByteBuffer attachBuffer = ByteBuffer.allocate(8);

            System.out.println("read begin:"+Thread.currentThread().getName());
            /**
             * params: ByteBuffer
             * params2: 读取的起始位置
             * params3: 附件 : 万一一次读不完,,需要一个bytebuffer接着读
             * params4: 真正结果出来了,调用这个回调方法
             */
            channel.read(buffer,0,attachBuffer,new CompletionHandler<Integer, ByteBuffer>(){
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    // 当你的文件正确读取完毕后

                    attachment.flip();
                    debugAll(attachment);
                    System.out.println(Thread.currentThread().getName()+"hehe");

                    countDownLatch.countDown();
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("exc = " + exc);
                    System.out.println(exc.getMessage()+"e");
                }
            });
            System.out.println("read finished");
            countDownLatch.await();

        } catch (IOException e) {
            throw new RuntimeException(e);
        }



    }


}

http://www.kler.cn/a/569530.html

相关文章:

  • 大夏龙雀科技4G Cat1 CT511-AT0 MQTT联网实战教程
  • C++格式讲解
  • PhyloSuite v1.2.3安装与使用-生信工具049
  • 大模型学习笔记-基于《Deep Dive into LLMs like ChatGPT》
  • 第1章 基础网络和安全工具(网络安全防御实战--蓝军武器库)
  • 谈谈 Node.js 中的模块系统,CommonJS 和 ES Modules 的区别是什么?
  • 不要升级,Flutter Debug 在 iOS 18.4 beta 无法运行,提示 mprotect failed: Permission denied
  • ubuntu:桌面版磁盘合并扩容
  • Stapler: 1靶场渗透测试
  • 中间件专栏之Redis篇——Redis的三大持久化方式及其优劣势对比
  • LeetCode-81. 搜索旋转排序数组 II
  • Java 大视界 -- Java 大数据中的时间序列数据异常检测算法对比与实践(103)
  • server.servlet.session.timeout: 12h(HTTP 会话的超时时间为 12 小时)
  • k8s学习记录:环境搭建二(基于Kubeadmin)
  • 【线性代数】3向量
  • Mybatis是如何进行分页的?与Mybatis-plus的区别在哪里?
  • 【服务器】Nginx
  • 【AD】3-10 原理图PDF导出
  • 力扣hot 100之矩阵四题解法总结
  • 盛京开源社区加入 GitCode,书写东北开源生态新篇章