当前位置: 首页 > article >正文

IO详解(BIO、NIO、实战案例、底层原理刨析)

文章目录

  • IO详解(BIO、NIO、实战案例、底层原理刨析)
    • 🌎 IO
    • 🪐 同步、异步、阻塞、非阻塞
    • ⚡ BIO
        • 👽 简介
        • 😎 案例
    • 🚀 NIO
        • ✈️ 介绍
        • 🚗 Buffer(缓冲)
        • 🛸 Channel(通道)
        • 🛥️ Selector(选择器)
        • 🚍 案例
        • 🚩 源码解析

源码地址: IO模型详解.md · 小Liu/IO模型学习 - Gitee.com;

IO详解(BIO、NIO、实战案例、底层原理刨析)

文章理解若有误,烦请指出

🌎 IO

​ 我们先来了解一下IO,我们知道冯诺依曼结构体系中,计算机分为:计算器、控制器、存储器、输入设备、输出设备。我们平常所说的IO其实就是输入设备将数据交给CPU和内存,CPU和内存把处理过后的数据交给输出设备,这个过程就叫做IO

​ 我们知道在操作系统中,一个进程的地址空间划分为用户空间内核空间,从应用程序的视角来看的话,我们的应用程序对操作系统的内核发起 IO 请求调用(系统调用),操作系统负责的内核执行具体的 IO 操作。也就是说,我们的应用程序实际上只是发起了 IO 操作的请求调用而已,具体 IO 的执行是由操作系统的内核来完成的。

🪐 同步、异步、阻塞、非阻塞

​ 同步、异步、阻塞、非阻塞的概念是很容易搞混的,同步和异步是对于被调用者来说的。而阻塞和非阻塞是对于调用者来说的。

  • 同步:比如A调用B,而B不会立马返回响应,B处理完成之后才会给A返回响应,这种情况就叫做同步。
  • 异步:比如A调用B,B立马给A返回响应,告诉A我正在处理了,等B处理完之后,B会再告诉A,我处理完了,这就叫异步。
  • 阻塞:比如A调用B,A会被挂起一直等待B处理完成,才进行别的工作任务,这就是阻塞
  • 非阻塞:比如A调用B,A不会被挂起一直等待B处理完成,而是去执行别的操作,A可以通过轮询、提供回调函数给B,或者监听一个消息队列或者时间,B完成工作后将结果放入到消息队列中,来看B是否完成工作,这就是非阻塞

​ 通过同步、异步、阻塞、非阻塞进行组合,我们可以得出以下三种经典的IO模型:BIO(同步阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞)、理论上来说异步阻塞是不存在的,但是其实在业务场景中异步阻塞是存在的,我们来个举例说明

异步阻塞【业务上的】:线程A先调用B,B立马返回响应给A后,B执行真正的任务,这时候A收到了响应,继续执行,A再调用了C,C也立马返回响应给A后,C执行真正的任务,这时候将A去循环阻塞的轮询B和C,假设这时候C完成了,A轮询到了之后,对C进行处理,然后继续轮询,知道B也完成后,A才结束阻塞。

总结:同步和异步是相对于两个线程是否在同一时间做不同的事。但是阻塞和非阻塞在理论上和业务上有点细微的差别

  • 理论上来说:阻塞和非阻塞是指一个线程是否被挂起,而同步和异步是指两个线程的调用顺序和交互方式,所以理论上来说异步阻塞不存在

  • 业务上来说:阻塞是指一个线程是否在等待某个事情的完成,而不一定是线程被挂起,所以说业务上来说异步阻塞是存在的

​ 接下来我们来分别了解一下IO模型。

⚡ BIO

👽 简介

​ 我们看一下BIO的一个模型图,在Java中,像我们平时使用的IO流就是一种同步阻塞的IO,在执行read操作的时候,线程会阻塞,等待内核把数据准备好了,线程才继续执行。

我们来了解一下什么是read:read是一种系统调用,用于读取文件描述符对应的数据【注意,读取socket读取数据的系统调用是recvfrom 】,如果描述符没有数据可读,read 调用会阻塞,直到有数据可用或文件被关闭,这也是为什么BIO会阻塞的原因

在这里插入图片描述

​ 下面我们通过网络通讯的角度来写一个同步阻塞的案例

😎 案例
  • BIO服务端

    /**
     * 同步阻塞IO服务端
     * @author Liu Hanlin
     * @create 2024-10-25 0:30
     */
    public class BIOServer {
        public static void main(String[] args) {
            try (ServerSocket serverSocket = new ServerSocket(8888)) {
    
                System.out.println("【服务端】等待连接中...");
                while (true){
    
                    // 阻塞等待连接,收到连接后继续执行
                    Socket clientSocket = serverSocket.accept();
                    System.out.printf("【服务端】收到【客户端:%s】的连接\n", clientSocket.getRemoteSocketAddress());
                    handle(clientSocket);
                }
    
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        public static void handle(Socket socket) throws IOException {
    
            try (InputStream inputStream = socket.getInputStream()) {
                System.out.println("开始处理===");
                byte[] bytes = new byte[1024];
                while ( (inputStream.read(bytes)) != -1){
    
                    System.out.printf("收到【客户端】消息:%s\n", new String(bytes, StandardCharsets.UTF_8));
                }
            }catch (Exception e){
                System.out.println(e.getMessage());
            }
        }
    }
    
  • BIO客户端

    /**
     * 阻塞IO客户端
     * @author Liu Hanlin
     * @create 2024-10-25 0:59
     */
    public class BIOClient {
        public static void main(String[] args) throws IOException {
    
            OutputStream outputStream = null;
            try (Socket socket = new Socket("127.0.0.1", 8888)) {
    
                Scanner scanner = new Scanner(System.in);
    			System.out.println("输入发送消息(exit退出):");
                while (scanner.hasNext()){
    
                    String msg = scanner.next();
    
                    if("exit".equals(msg)){
                        scanner.close();
                        break;
                    }
                    outputStream = socket.getOutputStream();
                    outputStream.write(msg.getBytes());
                    System.out.printf("发送消息:%s\n", msg);
                }
    
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }finally {
                if (outputStream != null){
                    outputStream.close();
                }
            }
    
        }
    }
    

    我们运行上述可看到

    在这里插入图片描述

​ 启动一个客户端可以正常连接发送,我们再把客户端2启动,发现服务器端此时收不到客户端2的消息,因为此时是阻塞的,当我们把客户端1停止的时候,我们可以看到如下,客户端2 发送的消息瞬间就被接收到并且处理了。

在这里插入图片描述

​ 上述案例可以看到,我们使用传统的Socket进行网络连接请求发送的时候,一个线程只能处理一个客户端,需要完全处理完这个客户端后才能下一个客户端,这种情况如果在客户端数量非常多的时候,那么就会导致某些客户端响应速度过慢,接下来我们来介绍一些NIO

🚀 NIO

✈️ 介绍

​ 在Java中NIO可以看作同步非阻塞IO模型,也可以看做IO多路复用模型,我们先来看一下同步非阻塞的模型图,我们可以看到,线程会调用内核进行IO读取,但是在内核准备数据的时候,线程并没有阻塞,而是一直在反复调用read,那为什么同步非阻塞模型在资源还没有准备好的时候进行read系统调用不回阻塞呢,因为在次模型中,如果资源没有准备好,内核会快速返回一个erroy来表示资源还没有准备好,这样应用程序就不会被阻塞。

在这里插入图片描述

接下来我们再来看看IO多路复用技术的模型图

在这里插入图片描述

​ 在IO多路复用模型中我们可以看到,应用程序发起IO不再是使用read系统调用,而是使用selectpollepoll,我们来解释一下这三个系统调用。

  • select:将所有的连接(io连接或者socket连接)都放到一个描述符集合里,然后通过select系统调用将描述符集合拷贝的内核中,内核来遍历检查描述符资源是否准备好,如果有准备好的描述符,就将其拷贝回用户空间中,然后用户空间再将此描述符集合遍历,拿到准备好的描述符资源进行读写>
  • poll:poll实际上和select没有太大的本质差别,差别在于poll存储描述符采用动态数组链表来存储
  • epoll:在内核中用了一颗红黑树来存储来存储所有需要监听的描述符,像selectpoll新增监听的描述符时,需要把新增的描述符放到描述符集合里,然后再把整个集合传给内核,而epoll红黑树之后,只需要将新增监听的描述符传给内核,内核将新增的描述符添加到红黑树中。epoll在内核中还维护了一个链表用来存放就绪的描述符,当某个描述符有事件发生之后,通过回调函数的方式将其注册到就绪的描述符链表中,而用户态和内核之间传输就只用传输这个就绪的描述符链表,而且用户进程也不需要再遍历去查找就绪的描述符

​ 上述我们讲到了NIO的细节,我们在Java中提供了一个NIO的包,其实现原理也和上述IO多路复用类似。在Java的NIO中有三个核心的概念

  • Buffer(缓冲区):NIO 读写数据都是通过缓冲区进行操作的。读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。

  • Channel(信道):Channel 是一个双向的、可读可写的数据传输通道,NIO 通过 Channel 来实现数据的输入输出。通道是一个抽象的概念,它可以代表文件、套接字或者其他数据源之间的连接。

  • Selector(选择器):允许一个线程处理多个 Channel,基于事件驱动的 I/O 多路复用模型。所有的 Channel 都可以注册到 Selector 上,由 Selector 来分配线程来处理事件。

我们来详细介绍一下

🚗 Buffer(缓冲)

​ Buffer可以理解成一个数组,用来存储数据,在读写数据的时候,都是对Buffer进行操作,Buffer有几个重要的概念,我们来看一下

public abstract class Buffer {
    // 属性满足的关系: mark <= position <= limit <= capacity
    
    // 允许讲位置直接定位到该标记出,可选项
    private int mark = -1;
    // 下一个可以被读写的数据的位置,读写模式切换时,会归零
    private int position = 0;
    // 读写的边界,写模式下代表最多能写入的数据,通常等于capatity,读模式下表示buffer中数据的实际长度
    private int limit;
    // Buffer可以存储的最大数据量,创建时设置且不可改变;
    private int capacity;
}

​ 我们来看一下读写模式分别的图解

在这里插入图片描述

Buffer 对象不能通过 new 调用构造方法创建对象 ,只能通过静态方法实例化 Buffer

这里以 ByteBuffer为例进行介绍:

// 分配堆内存
public static ByteBuffer allocate(int capacity);
// 分配直接内存
public static ByteBuffer allocateDirect(int capacity);

常用方法:

  1. get : 读取缓冲区的数据
  2. put :向缓冲区写入数据

除上述两个方法之外,其他的重要方法:

  • flip :将缓冲区从写模式切换到读模式,它会将 limit 的值设置为当前 position 的值,将 position 的值设置为 0。
  • clear: 清空缓冲区,将缓冲区从读模式切换到写模式,并将 position 的值设置为 0,将 limit 的值设置为 capacity 的值。
🛸 Channel(通道)

​ Channel是一种全双工的数据通道,不同于Java传统IO中的流只能读或者写,同一个Channel可以进行读也可以进行写,Channel写数据时将数据写入Buffer中,Channel读数据时从Buffer中进行读取。

我们介绍一下常用的几种Channel类型

  • FileChannel:文件访问通道;
  • SocketChannelServerSocketChannel:TCP 通信通道;
  • DatagramChannel:UDP 通信通道;

再来介绍一下两个核心方法:

  • read :读取数据并写入到 Buffer 中。
  • write :将 Buffer 中的数据写入到 Channel 中。
🛥️ Selector(选择器)

​ Selector是NIO中的一个核心组件,一个线程对应一个Selector,Selector可以注册多个Channel,Selector会不断的轮询Channel,然后将有监听事件发生的Channel轮询出来,比如某个Channel上有新的 TCP 连接接入、读和写事件【比如缓冲区存在】,发生事件后通过回调函数将Channel设置成就绪,这个 Channel 就处于就绪状态,会被 Selector 轮询出来。Selector 会将相关的 Channel 加入到就绪集合中。通过 SelectionKey 【类似于上述的描述符】可以获取就绪 Channel 的集合,然后对这些就绪的 Channel 进行相应的 I/O 操作。如下图所示:

在这里插入图片描述

​ 上述我们提到了监听的事件,我们可以通过SelectionKey枚举类来获取事件,Selector监听的事件分为如下几种:

  • SelectionKey.OP_ACCEPT:表示通道接收连接事件,用于ServerSocketChannel
  • SelectionKey.OP_CONNECT:表示完成通道完成连接事件,用于SocketChannel
  • SelectionKey.OP_READ:表示通道准备好进行读取的事件,即有数据可读,也就是说。
  • SelectionKey.OP_WRITE:表示通道准备好进行写入的事件,即可以写入数据。

我们可以通过所对应的Channel.register(selector, selectionKey)注册channel到选择器并绑定发生某个事件时,channel就绪

​ Selector在Java的NIO中是一个抽象的类,可以通过调用Selector.open()方法获取实例,获取到的实例有三个集合,我们分别来解释一下。

  • 所有的SelectionKey集合:表示所有被注册到Selector上的Channel,可以通过keys()方法获取
  • 所有就绪的SelectionKey集合:表示所有,有事件发生的Channel,需要进行IO处理的Channel,通过selectedKeys()获取
  • 被删除的SelectionKey集合:代表了所有被取消注册关系的 Channel,在下一次执行 select() 方法时,这些 Channel 对应的 SelectionKey 会被彻底删除,程序通常无须直接访问该集合,也没有暴露访问的方法。
🚍 案例

我们下面通过一个案例来说明

NIO服务端

public class NIOServer {

    public static void main(String[] args) throws Exception {

        try(Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();){
            // 绑定监听端口号
            serverSocketChannel.bind(new InetSocketAddress(8888));
            // 设置非阻塞
            serverSocketChannel.configureBlocking(false);
            // 注册接收连接事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("等待连接...");
            while (true) {
                // 【阻塞】监听是否有事件发生
                int count = selector.select();
                if (count > 0) {
                    // 发生事件后获取就绪key集合,拿到有新连接后的Channel对应的SelectionKey
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();

                        // isAcceptable() 是用于判断通道是否可以接受新连接的状态方法【处理新连接事件】
                        if (key.isAcceptable()) {

//                        当key.isAcceptable为true时,调用key.channel(),返回的channel和我们上面定义的其实是同一个
//                        ServerSocketChannel server = (ServerSocketChannel) key.channel();

                            // 服务端channel接收客户端SocketChannel
                            SocketChannel clientSocketChannel = serverSocketChannel.accept();

                            clientSocketChannel.configureBlocking(false);

                            // 注册读事件,读事件发生时,会将对应的selectionKey加入到就绪集合中
                            clientSocketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
                            System.out.printf("连接成功!接收【%s】的连接\n", clientSocketChannel.getRemoteAddress());
                        }

                        // 用于判断key的Channel是否已准备好读取。【处理读事件】
                        if(key.isReadable()){
                            // 处理读操作
                            handleRead(key);
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            // 设置可写,接下来将进行写操作
                            socketChannel.register(selector, SelectionKey.OP_WRITE);

                        }
                        // 用于判断key的Channel是否已准备好写。【处理写事件】
                        if(key.isWritable()){
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            socketChannel.write(ByteBuffer.wrap("我已经处理你的消息".getBytes(StandardCharsets.UTF_8)));

                            // 将客户端通道注册到 Selector 并重新监听读事件,不然这里会一直写,值得缓冲区满
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        iterator.remove();
                    }
                }

            }
        }
    }


    /**
     * 处理读事件
     * @param key
     */
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        // 创建缓冲区对象【默认是写模式】
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        int byteSize = channel.read(byteBuffer);
        if(byteSize > 0){
            // 切换读模式
            byteBuffer.flip();
            String msg = new String(byteBuffer.array(), 0, byteSize);
            System.out.printf("处理线程:【%s】--处理来自【%s】消息:【", Thread.currentThread().getName(), channel.getRemoteAddress());
            System.out.println(msg + "】");

        }else if(byteSize == -1){
            System.out.printf("关闭【%s】的连接\n", channel.getRemoteAddress());
            // 关闭连接
            channel.close();
            // 取消注册
            key.channel();
        }
    }
}

NIO客户端

public class NIOClient {
    public static void main(String[] args) {

        try(SocketChannel socketChannel = SocketChannel.open();
            Scanner scanner = new Scanner(System.in)){
            // 连接到服务器
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
            System.out.println("发送消息(退出请输入quit):");
            while (scanner.hasNext()){
                String msg = scanner.nextLine();
                if ("quit".equals(msg)){
                    break;
                }
                socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
                System.out.printf("发送成功!消息内容:【%s】\n",msg);
                handleRead(socketChannel);
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }

    }
    private static void handleRead(SocketChannel channel) throws IOException {

        // 创建缓冲区对象【默认是写模式】
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        int byteSize = channel.read(byteBuffer);
        if(byteSize > 0){
            // 切换读模式
            byteBuffer.flip();
            String msg = new String(byteBuffer.array(), 0, byteSize);
            System.out.printf("收到了来自【%s】响应:【", channel.getRemoteAddress());
            System.out.println(msg + "】");
        }
    }
}

​ 通过上述我们可以看到运行结果,可以同时处理两个客户端的请求,不用等待一个客户端断开连接才能处理另外一个客户端。

在这里插入图片描述

​ 我们来梳理一下上述代码流程:

  1. 首先我们在服务端声明一个SelectorServerSocketChannel,将ServerSocketChannel设置成非阻塞,绑定端口号,注册监听事件
  2. 然后通过调用selector.select()方法,获取到所有就绪channel对应的selectionKey集合,然后遍历该集合,通过key判断是否可以进行读写操作,是否有新连接可以建立,针对事件进行处理,处理完之后将就绪Key在就绪集合中移除,避免重复处理
  3. 在客户端声明一个SocketChannel,然后通过SocketChannel.connect()方法与服务端建立连接,然后将数据写到缓冲区,再通过ByteBuffer.wrap()方法将缓冲区数据发送给服务端
  4. 服务端监听到SocketChannel的数据后,会将对应客户端的channel标记从可读状态,然后selector轮循到可读状态的channel后,会将其SelectionKey加入到集合中,然后对SelectionKey判断是否可读,然后进行读处理,处理完之后设置该Channel监听可写,然后轮询到可写后,进行写操作,写完之后再将其设置成监听可读。
🚩 源码解析

​ 当我们使用selector.select()方法时,我们点进去其源码查看这里以JDK17为准,如图所示:

在这里插入图片描述

​ 我们继续进入该方法:

在这里插入图片描述

​ 这里我们看到有两个实现类,分别解释一下

  • WEPollSelectorImpl:主要用于类Unix系统(如Linux),基于 epoll系统调用,这是Linux内核提供的高效IO多路复用机制。
  • WindowsSelectorImpl:基于 Windows Sockets API,使用 IOCP(I/O Completion Ports)进行高效的异步IO处理。

这里我们以WEPollSelectorImpl举例,进去查看,我们重点关注这几行

在这里插入图片描述

​ 进入processUpdateQueue()方法中

在这里插入图片描述

updateKeys属性就是更新后的需要监听的channel对应的所有key;从上述代码int fd = ski.getFDVal();中我们可以看出来,我们Java中一个selectionKey对应一个chennel对应一个文件描述符,我们得到对应的文件描述符后,代码WEPoll.ctl()方法实际上就是设置epoll系统调用请求的资源,这个方法processUpdateQueue()大概就是说将我们最新的需要监听的channel,也就是需要获取的IO资源,设置在一个epoll系统调用里,方便后续发起epoll系统调用。

​ 我们再进入processDeregisterQueue()方法中
在这里插入图片描述

​ 该方法就是将取消注册的key从监听的Key集合中删除

​ 我们再来看WEPoll.wait()这个方法,这个方法实际上就是对epoll_wait()【操作系统中发起epoll调用的方法】的封装,所以我们可以得出结论,Java的NIO本质上是使用的epoll系统调用来实现的。调用这个方法后,epoll_wait 系统调用会阻塞当前线程,直到有事件发生或超时时间到达。并且epoll_wait 会返回实际发生的事件数量。

​ 最后我们再来看processEvents()这个方法,如图所示

在这里插入图片描述

​ 这个方法我们会对返回的事件进行一个处理,将其加入到我们的一个publicSelectedKeys集合中,也就是我们通过selector.selectedKeys();返回加入到publicSelectedKeys集合中的个数。这时候我们就可以通过业务代码对我们对应的channel进行处理了。


http://www.kler.cn/a/375597.html

相关文章:

  • LabVIEW程序员赚钱不仅限于上班
  • etcd多实例配置
  • Oracle与SQL Server的语法区别
  • 封装一个请求的hook(react函数组件)
  • 数据结构算法学习方法经验总结
  • 统计数据集的TXT、XML及JSON标注文件中各类别/每个标签的数量
  • CSS3简介(一)
  • 解决项目中图片出不来的bug
  • ubuntu启动慢,如何看启动耗时分布
  • 分布式 ID 生成策略(二)
  • Redis 内存回收策略小结
  • Spark中的常见算子
  • ubuntu22-安装vscode-配置shell命令环境-mac安装
  • Serverless + AI 让应用开发更简单
  • 报错:npm : 无法加载文件 C:\Program Files\nodejs\npm.ps1,因为在此系统上禁止运行脚本。
  • vue3-element-admin 去掉登录
  • Docker Compose入门学习——下载、授权、创建文件、定义服务
  • 创建一个基于SSM(Spring, Spring MVC, MyBatis)的教学视频点播系统
  • Sigrity Power SI Multiple Structure Simulation模式如何进行跨板级联仿真操作指导(一)
  • npm install -g @vue/cil 非常卡慢
  • linux alsa-lib snd_pcm_open函数源码分析(一)
  • 腾讯云数据库TDSQL:数据库界的“高架桥”
  • 【论文阅读】Associative Alignment for Few-shot Image Classification
  • ESP-IDF HTTP POST请求发送音频-启明云端乐鑫代理商
  • 【机器学习】21. Transformer: 最通俗易懂讲解
  • 优化低代码开发平台用户体验:功能树导航设计探讨