当前位置: 首页 > article >正文

NIO基础

文章目录

  • NIO基础
    • 1. 三大组件
      • 1.1 Channel
      • 1.2 Buffer
      • 1.3 Selector
    • 2. ByteBuffer
      • 2.1 使用方式
      • 2.2 常见方法
      • 2.3 解决黏包问题
    • 3. 网络编程
      • 3.1 阻塞
      • 3.2 非阻塞
      • 3.3 多路复用
      • 3.4 利用多线程优化

NIO基础

1. 三大组件

1.1 Channel

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层

channel
buffer

常见的 Channel 有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

1.2 Buffer

Buffer 是一块可以写入数据,然后可以从中读取数据的内存块。

它与数组类似,但是提供了对数据的结构化访问,并且可以跟踪已经读写了多少数据。

常见的 buffer 有:

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

1.3 Selector

Selector 是 NIO 中实现多路复用的关键组件,允许单个线程管理多个 Channel。

Selector 能够监控多个注册到它的 Channel 上是否有事件发生(如连接建立、数据到达等)。这样,一个单独的线程可以通过轮询 Selector 来判断一组 Channel 是否有就绪状态的事件,从而只需要少量的线程就能处理大量的客户端请求。

使用 Selector 可以显著减少系统资源的开销,特别适用于需要同时处理大量网络连接的情况。

selector
thread
channel
channel
channel

2. ByteBuffer

2.1 使用方式

  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear() 或 compact() 切换至写模式
  5. 重复 1~4 步骤

示例:

@Slf4j
public class BufferDemo {
    public static void main(String[] args) {
        try (RandomAccessFile file =
                     new RandomAccessFile("test.txt", "rw")){
            ByteBuffer buffer = ByteBuffer.allocate(10);
            FileChannel channel = file.getChannel();
            while (true){
                int read = channel.read(buffer);
                if(read == -1)break;
                buffer.flip();
                while(buffer.hasRemaining()){
                    log.info("读取到:{}",(char)buffer.get());
                }
                buffer.clear();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2.2 常见方法

分配空间:

Bytebuffer buf = ByteBuffer.allocate(16);

写入数据有两种办法:

  • 调用 channel 的 read 方法
  • 调用 buffer 自己的 put 方法
int readBytes = channel.read(buf);
buf.put((byte)127);

读取数据同样有两种办法:

  • 调用 channel 的 write 方法
  • 调用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark 和 reset:

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

字符串与 ByteBuffer 互转:

ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

debug(buffer1);
debug(buffer2);

CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

2.3 解决黏包问题

将错乱的数据恢复成原始的按 \n 分隔的数据:

public static void main(String[] args) {
    ByteBuffer source = ByteBuffer.allocate(32);
    source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
    split(source);

    source.put("w are you?\nhaha!\n".getBytes());
    split(source);
}

private static void split(ByteBuffer source) {
    source.flip();
    int oldLimit = source.limit();
    for (int i = 0; i < oldLimit; i++) {
        if (source.get(i) == '\n') {
            System.out.println(i);
            ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
            // 0 ~ limit
            source.limit(i + 1);
            target.put(source); // 从source 读,向 target 写
            debugAll(target);
            source.limit(oldLimit);
        }
    }
    source.compact();
}

3. 网络编程

3.1 阻塞

阻塞模式下,相关方法都会导致线程暂停

  • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
  • SocketChannel.read 会在没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));

// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
    // 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
    log.debug("connecting...");
    SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
    log.debug("connected... {}", sc);
    channels.add(sc);
    for (SocketChannel channel : channels) {
        // 5. 接收客户端发送的数据
        log.debug("before read... {}", channel);
        channel.read(buffer); // 阻塞方法,线程停止运行
        buffer.flip();
        debugRead(buffer);
        buffer.clear();
        log.debug("after read...{}", channel);
    }
}

3.2 非阻塞

  • 非阻塞模式下,相关方法都会不会让线程暂停
    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
    // 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
    SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
    if (sc != null) {
        log.debug("connected... {}", sc);
        sc.configureBlocking(false); // 非阻塞模式
        channels.add(sc);
    }
    for (SocketChannel channel : channels) {
        // 5. 接收客户端发送的数据
        int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
        if (read > 0) {
            buffer.flip();
            debugRead(buffer);
            buffer.clear();
            log.debug("after read...{}", channel);
        }
    }
}

3.3 多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

处理read事件服务器代码:

public static void main(String[] args) throws IOException {
    // 1. 创建 selector, 管理多个 channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2. 建立 selector 和 channel 的联系(注册)
    // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // key 只关注 accept 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
        // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
        // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
        selector.select();
        // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
            iter.remove();
            log.debug("key: {}", key);
            // 5. 区分事件类型
            if (key.isAcceptable()) { // 如果是 accept
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                // 将一个 byteBuffer 作为附件关联到 selectionKey 上
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                log.debug("{}", sc);
                log.debug("scKey:{}", scKey);
            } else if (key.isReadable()) { // 如果是 read
                try {
                    SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                    // 获取 selectionKey 上关联的附件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                    if(read == -1) {
                        key.cancel();
                    } else {
                        split(buffer);
                        // 需要扩容
                        if (buffer.position() == buffer.limit()) {
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();
                            newBuffer.put(buffer); // 0123456789abcdef3333\n
                            key.attach(newBuffer);
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                }
            }
        }
    }
}

处理write时间服务端代码:

public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.bind(new InetSocketAddress(8080));
    ssc.configureBlocking(false);
    ssc.register(selector,SelectionKey.OP_ACCEPT);
    while(true){
        selector.select();
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            try {
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    StringBuilder sb = new StringBuilder();
                    for(int i=0;i<10000;i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    sc.write(buffer);
                    if(buffer.hasRemaining()){
                        sc.register(selector,SelectionKey.OP_WRITE,buffer);
                    }
                }else if(key.isWritable()){
                    ByteBuffer attachment = (ByteBuffer)key.attachment();
                    SocketChannel sc = (SocketChannel)key.channel();
                    sc.write(attachment);
                    if(!attachment.hasRemaining()){
                        key.cancel();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                key.cancel();
            }
        }
    }

}

3.4 利用多线程优化

public class ChannelDemo7 {
    public static void main(String[] args) throws IOException {
        new BossEventLoop().register();
    }


    @Slf4j
    static class BossEventLoop implements Runnable {
        private Selector boss;
        private WorkerEventLoop[] workers;
        private volatile boolean start = false;
        AtomicInteger index = new AtomicInteger();

        public void register() throws IOException {
            if (!start) {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(8080));
                ssc.configureBlocking(false);
                boss = Selector.open();
                SelectionKey ssckey = ssc.register(boss, 0, null);
                ssckey.interestOps(SelectionKey.OP_ACCEPT);
                workers = initEventLoops();
                new Thread(this, "boss").start();
                log.debug("boss start...");
                start = true;
            }
        }

        public WorkerEventLoop[] initEventLoops() {
            //        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
            WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
            for (int i = 0; i < workerEventLoops.length; i++) {
                workerEventLoops[i] = new WorkerEventLoop(i);
            }
            return workerEventLoops;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    boss.select();
                    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            ServerSocketChannel c = (ServerSocketChannel) key.channel();
                            SocketChannel sc = c.accept();
                            sc.configureBlocking(false);
                            log.debug("{} connected", sc.getRemoteAddress());
                            workers[index.getAndIncrement() % workers.length].register(sc);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Slf4j
    static class WorkerEventLoop implements Runnable {
        private Selector worker;
        private volatile boolean start = false;
        private int index;

        private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

        public WorkerEventLoop(int index) {
            this.index = index;
        }

        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                worker = Selector.open();
                new Thread(this, "worker-" + index).start();
                start = true;
            }
            tasks.add(() -> {
                try {
                    SelectionKey sckey = sc.register(worker, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    worker.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            worker.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    worker.select();
                    Runnable task = tasks.poll();
                    if (task != null) {
                        task.run();
                    }
                    Set<SelectionKey> keys = worker.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            try {
                                int read = sc.read(buffer);
                                if (read == -1) {
                                    key.cancel();
                                    sc.close();
                                } else {
                                    buffer.flip();
                                    log.debug("{} message:", sc.getRemoteAddress());
                                    debugAll(buffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                                key.cancel();
                                sc.close();
                            }
                        }
                        iter.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

http://www.kler.cn/a/326266.html

相关文章:

  • 获得PostgreSQL中级认证后,可以从事哪些工作岗位?
  • 消息中间件类型介绍
  • 【数据库】四、数据库管理与维护
  • 在iStoreOS上安装Tailscale
  • Linux pget 下载命令详解
  • ubuntu 20.04 安装 5.4 内核
  • Java集合ArrayList的扩容原理是什么?附源码讲解+举例说明
  • DSPy101
  • 有源滤波器故障怎么处理
  • Redis哨兵模式的搭建以及配置参数简介
  • websocket接收文心一言示例
  • 【系统架构设计师】经典论文:轮软件三层架构设计
  • ClickHouse | 入门
  • Python鸭子类型解释
  • ubuntu 下载安装 启动盘创建器,将ubuntu22.04的Ios文件,制作成启动盘
  • C++(string字符串、函数)
  • Python知识点:如何使用Airflow进行ETL任务调度
  • 2024 Python3.10 系统入门+进阶(十六):正则表达式
  • 如何提升网页加载和跳转速度:Flask 模板渲染 vs Nginx 静态资源处理
  • 数商云B2B2C商城系统如何帮企业降本增效
  • 【Linux】模拟实现一个shell
  • 六、设计模式-6.2、代理模式
  • 鸿蒙 如何退出 APP
  • JSON字符串转换成对象
  • 嵌套的JSON字符串解析成Java对象
  • 瑜伽馆预约小程序,在线瑜伽课程预约系统