IO详解(BIO、NIO、实战案例、底层原理刨析)
文章目录
- IO详解(BIO、NIO、实战案例、底层原理刨析)
- 🌎 IO
- 🪐 同步、异步、阻塞、非阻塞
- ⚡ BIO
- 👽 简介
- 😎 案例
- 🚀 NIO
- ✈️ 介绍
- 🚗 Buffer(缓冲)
- 🛸 Channel(通道)
- 🛥️ Selector(选择器)
- 🚍 案例
- 🚩 源码解析
源码地址: IO模型详解.md · 小Liu/IO模型学习 - Gitee.com;
IO详解(BIO、NIO、实战案例、底层原理刨析)
文章理解若有误,烦请指出
🌎 IO
我们先来了解一下IO,我们知道冯诺依曼结构体系中,计算机分为:计算器、控制器、存储器、输入设备、输出设备。我们平常所说的IO其实就是输入设备将数据交给CPU和内存,CPU和内存把处理过后的数据交给输出设备,这个过程就叫做IO
我们知道在操作系统中,一个进程的地址空间划分为用户空间和内核空间,从应用程序的视角来看的话,我们的应用程序对操作系统的内核发起 IO 请求调用(系统调用),操作系统负责的内核执行具体的 IO 操作。也就是说,我们的应用程序实际上只是发起了 IO 操作的请求调用而已,具体 IO 的执行是由操作系统的内核来完成的。
🪐 同步、异步、阻塞、非阻塞
同步、异步、阻塞、非阻塞的概念是很容易搞混的,同步和异步是对于被调用者来说的。而阻塞和非阻塞是对于调用者来说的。
- 同步:比如A调用B,而B不会立马返回响应,B处理完成之后才会给A返回响应,这种情况就叫做同步。
- 异步:比如A调用B,B立马给A返回响应,告诉A我正在处理了,等B处理完之后,B会再告诉A,我处理完了,这就叫异步。
- 阻塞:比如A调用B,A会被挂起一直等待B处理完成,才进行别的工作任务,这就是阻塞
- 非阻塞:比如A调用B,A不会被挂起一直等待B处理完成,而是去执行别的操作,A可以通过轮询、提供回调函数给B,或者监听一个消息队列或者时间,B完成工作后将结果放入到消息队列中,来看B是否完成工作,这就是非阻塞
通过同步、异步、阻塞、非阻塞进行组合,我们可以得出以下三种经典的IO模型:BIO
(同步阻塞)、NIO
(同步非阻塞)、AIO
(异步非阻塞)、理论上来说异步阻塞是不存在的,但是其实在业务场景中异步阻塞是存在的,我们来个举例说明
异步阻塞【业务上的】:线程A先调用B,B立马返回响应给A后,B执行真正的任务,这时候A收到了响应,继续执行,A再调用了C,C也立马返回响应给A后,C执行真正的任务,这时候将A去循环阻塞的轮询B和C,假设这时候C完成了,A轮询到了之后,对C进行处理,然后继续轮询,知道B也完成后,A才结束阻塞。
总结:同步和异步是相对于两个线程是否在同一时间做不同的事。但是阻塞和非阻塞在理论上和业务上有点细微的差别
-
理论上来说:阻塞和非阻塞是指一个线程是否被挂起,而同步和异步是指两个线程的调用顺序和交互方式,所以理论上来说异步阻塞不存在
-
业务上来说:阻塞是指一个线程是否在等待某个事情的完成,而不一定是线程被挂起,所以说业务上来说异步阻塞是存在的
接下来我们来分别了解一下IO模型。
⚡ BIO
👽 简介
我们看一下BIO
的一个模型图,在Java中,像我们平时使用的IO流就是一种同步阻塞的IO,在执行read
操作的时候,线程会阻塞,等待内核把数据准备好了,线程才继续执行。
我们来了解一下什么是read:read是一种系统调用,用于读取文件描述符
对应的数据【注意,读取socket读取数据的系统调用是recvfrom
】,如果描述符没有数据可读,read
调用会阻塞,直到有数据可用或文件被关闭,这也是为什么BIO会阻塞的原因
下面我们通过网络通讯的角度来写一个同步阻塞的案例
😎 案例
-
BIO
服务端/** * 同步阻塞IO服务端 * @author Liu Hanlin * @create 2024-10-25 0:30 */ public class BIOServer { public static void main(String[] args) { try (ServerSocket serverSocket = new ServerSocket(8888)) { System.out.println("【服务端】等待连接中..."); while (true){ // 阻塞等待连接,收到连接后继续执行 Socket clientSocket = serverSocket.accept(); System.out.printf("【服务端】收到【客户端:%s】的连接\n", clientSocket.getRemoteSocketAddress()); handle(clientSocket); } } catch (IOException e) { throw new RuntimeException(e); } } public static void handle(Socket socket) throws IOException { try (InputStream inputStream = socket.getInputStream()) { System.out.println("开始处理==="); byte[] bytes = new byte[1024]; while ( (inputStream.read(bytes)) != -1){ System.out.printf("收到【客户端】消息:%s\n", new String(bytes, StandardCharsets.UTF_8)); } }catch (Exception e){ System.out.println(e.getMessage()); } } }
-
BIO
客户端/** * 阻塞IO客户端 * @author Liu Hanlin * @create 2024-10-25 0:59 */ public class BIOClient { public static void main(String[] args) throws IOException { OutputStream outputStream = null; try (Socket socket = new Socket("127.0.0.1", 8888)) { Scanner scanner = new Scanner(System.in); System.out.println("输入发送消息(exit退出):"); while (scanner.hasNext()){ String msg = scanner.next(); if("exit".equals(msg)){ scanner.close(); break; } outputStream = socket.getOutputStream(); outputStream.write(msg.getBytes()); System.out.printf("发送消息:%s\n", msg); } } catch (Exception e) { System.out.println(e.getMessage()); }finally { if (outputStream != null){ outputStream.close(); } } } }
我们运行上述可看到
启动一个客户端可以正常连接发送,我们再把客户端2启动,发现服务器端此时收不到客户端2的消息,因为此时是阻塞的,当我们把客户端1停止的时候,我们可以看到如下,客户端2 发送的消息瞬间就被接收到并且处理了。
上述案例可以看到,我们使用传统的Socket
进行网络连接请求发送的时候,一个线程只能处理一个客户端,需要完全处理完这个客户端后才能下一个客户端,这种情况如果在客户端数量非常多的时候,那么就会导致某些客户端响应速度过慢,接下来我们来介绍一些NIO
🚀 NIO
✈️ 介绍
在Java中NIO可以看作同步非阻塞IO模型,也可以看做IO多路复用模型,我们先来看一下同步非阻塞的模型图,我们可以看到,线程会调用内核进行IO读取,但是在内核准备数据的时候,线程并没有阻塞,而是一直在反复调用read
,那为什么同步非阻塞模型在资源还没有准备好的时候进行read
系统调用不回阻塞呢,因为在次模型中,如果资源没有准备好,内核会快速返回一个erroy
来表示资源还没有准备好,这样应用程序就不会被阻塞。
接下来我们再来看看IO多路复用技术的模型图
在IO多路复用模型
中我们可以看到,应用程序发起IO不再是使用read
系统调用,而是使用select
、poll
、epoll
,我们来解释一下这三个系统调用。
- select:将所有的连接(io连接或者socket连接)都放到一个
描述符集合
里,然后通过select
系统调用将描述符集合拷贝的内核中,内核来遍历检查描述符
资源是否准备好,如果有准备好的描述符,就将其拷贝回用户空间中,然后用户空间再将此描述符集合遍历,拿到准备好的描述符
资源进行读写> - poll:poll实际上和select没有太大的本质差别,差别在于poll存储描述符采用
动态数组
、链表
来存储 - epoll:在内核中用了一颗红黑树来存储来存储所有需要监听的
描述符
,像select
、poll
新增监听的描述符时,需要把新增的描述符放到描述符集合里,然后再把整个集合传给内核,而epoll
红黑树之后,只需要将新增监听的描述符传给内核,内核将新增的描述符添加到红黑树中。epoll
在内核中还维护了一个链表用来存放就绪的描述符
,当某个描述符有事件发生之后,通过回调函数的方式将其注册到就绪的描述符链表中
,而用户态和内核之间传输就只用传输这个就绪的描述符链表
,而且用户进程也不需要再遍历去查找就绪的描述符
。
上述我们讲到了NIO的细节,我们在Java中提供了一个NIO的包,其实现原理也和上述IO多路复用类似。在Java的NIO中有三个核心的概念
-
Buffer(缓冲区):NIO 读写数据都是通过缓冲区进行操作的。读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中。
-
Channel(信道):Channel 是一个双向的、可读可写的数据传输通道,NIO 通过 Channel 来实现数据的输入输出。通道是一个抽象的概念,它可以代表文件、套接字或者其他数据源之间的连接。
-
Selector(选择器):允许一个线程处理多个 Channel,基于事件驱动的 I/O 多路复用模型。所有的 Channel 都可以注册到 Selector 上,由 Selector 来分配线程来处理事件。
我们来详细介绍一下
🚗 Buffer(缓冲)
Buffer可以理解成一个数组,用来存储数据,在读写数据的时候,都是对Buffer进行操作,Buffer有几个重要的概念,我们来看一下
public abstract class Buffer {
// 属性满足的关系: mark <= position <= limit <= capacity
// 允许讲位置直接定位到该标记出,可选项
private int mark = -1;
// 下一个可以被读写的数据的位置,读写模式切换时,会归零
private int position = 0;
// 读写的边界,写模式下代表最多能写入的数据,通常等于capatity,读模式下表示buffer中数据的实际长度
private int limit;
// Buffer可以存储的最大数据量,创建时设置且不可改变;
private int capacity;
}
我们来看一下读写模式分别的图解
Buffer
对象不能通过 new
调用构造方法创建对象 ,只能通过静态方法实例化 Buffer
。
这里以 ByteBuffer
为例进行介绍:
// 分配堆内存
public static ByteBuffer allocate(int capacity);
// 分配直接内存
public static ByteBuffer allocateDirect(int capacity);
常用方法:
get
: 读取缓冲区的数据put
:向缓冲区写入数据
除上述两个方法之外,其他的重要方法:
flip
:将缓冲区从写模式切换到读模式,它会将limit
的值设置为当前position
的值,将position
的值设置为 0。clear
: 清空缓冲区,将缓冲区从读模式切换到写模式,并将position
的值设置为 0,将limit
的值设置为capacity
的值。
🛸 Channel(通道)
Channel是一种全双工的数据通道,不同于Java传统IO中的流只能读或者写,同一个Channel可以进行读也可以进行写,Channel写数据时将数据写入Buffer中,Channel读数据时从Buffer中进行读取。
我们介绍一下常用的几种Channel类型
FileChannel
:文件访问通道;SocketChannel
、ServerSocketChannel
:TCP 通信通道;DatagramChannel
:UDP 通信通道;
再来介绍一下两个核心方法:
read
:读取数据并写入到 Buffer 中。write
:将 Buffer 中的数据写入到 Channel 中。
🛥️ Selector(选择器)
Selector是NIO中的一个核心组件,一个线程对应一个Selector,Selector可以注册多个Channel,Selector会不断的轮询Channel,然后将有监听事件发生
的Channel轮询出来,比如某个Channel上有新的 TCP 连接接入、读和写事件【比如缓冲区存在】,发生事件后通过回调函数将Channel设置成就绪,这个 Channel 就处于就绪状态,会被 Selector 轮询出来。Selector 会将相关的 Channel 加入到就绪集合中。通过 SelectionKey 【类似于上述的描述符】
可以获取就绪 Channel 的集合,然后对这些就绪的 Channel 进行相应的 I/O 操作。如下图所示:
上述我们提到了监听的事件,我们可以通过SelectionKey
枚举类来获取事件,Selector监听的事件分为如下几种:
SelectionKey.OP_ACCEPT
:表示通道接收连接事件,用于ServerSocketChannel
SelectionKey.OP_CONNECT
:表示完成通道完成连接事件,用于SocketChannel
SelectionKey.OP_READ
:表示通道准备好进行读取的事件,即有数据可读,也就是说。SelectionKey.OP_WRITE
:表示通道准备好进行写入的事件,即可以写入数据。
我们可以通过所对应的Channel.register(selector, selectionKey)
注册channel到选择器并绑定发生某个事件时,channel就绪
Selector在Java的NIO中是一个抽象的类,可以通过调用Selector.open()
方法获取实例,获取到的实例有三个集合,我们分别来解释一下。
- 所有的SelectionKey集合:表示所有被注册到
Selector
上的Channel
,可以通过keys()
方法获取 - 所有就绪的SelectionKey集合:表示所有,有事件发生的
Channel
,需要进行IO处理的Channel
,通过selectedKeys()
获取 - 被删除的SelectionKey集合:代表了所有被取消注册关系的
Channel
,在下一次执行select()
方法时,这些Channel
对应的SelectionKey
会被彻底删除,程序通常无须直接访问该集合,也没有暴露访问的方法。
🚍 案例
我们下面通过一个案例来说明
NIO服务端
public class NIOServer {
public static void main(String[] args) throws Exception {
try(Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();){
// 绑定监听端口号
serverSocketChannel.bind(new InetSocketAddress(8888));
// 设置非阻塞
serverSocketChannel.configureBlocking(false);
// 注册接收连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("等待连接...");
while (true) {
// 【阻塞】监听是否有事件发生
int count = selector.select();
if (count > 0) {
// 发生事件后获取就绪key集合,拿到有新连接后的Channel对应的SelectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// isAcceptable() 是用于判断通道是否可以接受新连接的状态方法【处理新连接事件】
if (key.isAcceptable()) {
// 当key.isAcceptable为true时,调用key.channel(),返回的channel和我们上面定义的其实是同一个
// ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 服务端channel接收客户端SocketChannel
SocketChannel clientSocketChannel = serverSocketChannel.accept();
clientSocketChannel.configureBlocking(false);
// 注册读事件,读事件发生时,会将对应的selectionKey加入到就绪集合中
clientSocketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
System.out.printf("连接成功!接收【%s】的连接\n", clientSocketChannel.getRemoteAddress());
}
// 用于判断key的Channel是否已准备好读取。【处理读事件】
if(key.isReadable()){
// 处理读操作
handleRead(key);
SocketChannel socketChannel = (SocketChannel) key.channel();
// 设置可写,接下来将进行写操作
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
// 用于判断key的Channel是否已准备好写。【处理写事件】
if(key.isWritable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(ByteBuffer.wrap("我已经处理你的消息".getBytes(StandardCharsets.UTF_8)));
// 将客户端通道注册到 Selector 并重新监听读事件,不然这里会一直写,值得缓冲区满
socketChannel.register(selector, SelectionKey.OP_READ);
}
iterator.remove();
}
}
}
}
}
/**
* 处理读事件
* @param key
*/
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
// 创建缓冲区对象【默认是写模式】
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int byteSize = channel.read(byteBuffer);
if(byteSize > 0){
// 切换读模式
byteBuffer.flip();
String msg = new String(byteBuffer.array(), 0, byteSize);
System.out.printf("处理线程:【%s】--处理来自【%s】消息:【", Thread.currentThread().getName(), channel.getRemoteAddress());
System.out.println(msg + "】");
}else if(byteSize == -1){
System.out.printf("关闭【%s】的连接\n", channel.getRemoteAddress());
// 关闭连接
channel.close();
// 取消注册
key.channel();
}
}
}
NIO客户端
public class NIOClient {
public static void main(String[] args) {
try(SocketChannel socketChannel = SocketChannel.open();
Scanner scanner = new Scanner(System.in)){
// 连接到服务器
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
System.out.println("发送消息(退出请输入quit):");
while (scanner.hasNext()){
String msg = scanner.nextLine();
if ("quit".equals(msg)){
break;
}
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
System.out.printf("发送成功!消息内容:【%s】\n",msg);
handleRead(socketChannel);
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
private static void handleRead(SocketChannel channel) throws IOException {
// 创建缓冲区对象【默认是写模式】
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int byteSize = channel.read(byteBuffer);
if(byteSize > 0){
// 切换读模式
byteBuffer.flip();
String msg = new String(byteBuffer.array(), 0, byteSize);
System.out.printf("收到了来自【%s】响应:【", channel.getRemoteAddress());
System.out.println(msg + "】");
}
}
}
通过上述我们可以看到运行结果,可以同时处理两个客户端的请求,不用等待一个客户端断开连接才能处理另外一个客户端。
我们来梳理一下上述代码流程:
- 首先我们在服务端声明一个
Selector
和ServerSocketChannel
,将ServerSocketChannel
设置成非阻塞,绑定端口号,注册监听事件 - 然后通过调用
selector.select()
方法,获取到所有就绪channel
对应的selectionKey
集合,然后遍历该集合,通过key判断是否可以进行读写操作,是否有新连接可以建立,针对事件进行处理,处理完之后将就绪Key在就绪集合中移除,避免重复处理 - 在客户端声明一个
SocketChannel
,然后通过SocketChannel.connect()
方法与服务端建立连接,然后将数据写到缓冲区,再通过ByteBuffer.wrap()
方法将缓冲区数据发送给服务端 - 服务端监听到
SocketChannel
的数据后,会将对应客户端的channel
标记从可读状态,然后selector
轮循到可读状态的channel
后,会将其SelectionKey
加入到集合中,然后对SelectionKey
判断是否可读,然后进行读处理,处理完之后设置该Channel
监听可写,然后轮询到可写后,进行写操作,写完之后再将其设置成监听可读。
🚩 源码解析
当我们使用selector.select()
方法时,我们点进去其源码查看这里以JDK17
为准,如图所示:
我们继续进入该方法:
这里我们看到有两个实现类,分别解释一下
WEPollSelectorImpl
:主要用于类Unix系统(如Linux),基于epoll
系统调用,这是Linux内核提供的高效IO多路复用机制。WindowsSelectorImpl
:基于 Windows Sockets API,使用IOCP
(I/O Completion Ports)进行高效的异步IO处理。
这里我们以WEPollSelectorImpl
举例,进去查看,我们重点关注这几行
进入processUpdateQueue()
方法中
updateKeys
属性就是更新后的需要监听的channel
对应的所有key
;从上述代码int fd = ski.getFDVal();
中我们可以看出来,我们Java中一个selectionKey
对应一个chennel
对应一个文件描述符,我们得到对应的文件描述符后,代码WEPoll.ctl()
方法实际上就是设置epoll
系统调用请求的资源,这个方法processUpdateQueue()
大概就是说将我们最新的需要监听的channel
,也就是需要获取的IO资源,设置在一个epoll
系统调用里,方便后续发起epoll
系统调用。
我们再进入processDeregisterQueue()
方法中
该方法就是将取消注册的key
从监听的Key
集合中删除
我们再来看WEPoll.wait()
这个方法,这个方法实际上就是对epoll_wait()【操作系统中发起epoll调用的方法】
的封装,所以我们可以得出结论,Java的NIO本质上是使用的epoll
系统调用来实现的。调用这个方法后,epoll_wait
系统调用会阻塞当前线程,直到有事件发生或超时时间到达。并且epoll_wait
会返回实际发生的事件数量。
最后我们再来看processEvents()
这个方法,如图所示
这个方法我们会对返回的事件进行一个处理,将其加入到我们的一个publicSelectedKeys
集合中,也就是我们通过selector.selectedKeys();
返回加入到publicSelectedKeys
集合中的个数。这时候我们就可以通过业务代码对我们对应的channel
进行处理了。