IO的类型(BIO、NIO、AIO)
概览
首先,我们需要明白什么是IO?IO从字面意思可以理解为输入输出,对于Java进程来讲,输入就是将数据输入到Java内存的缓存区,供Java读取使用,输出就是Java进程将数据写入到缓存区,进而输出到文件或网络数据流。IO有两种:File IO和Stream IO,其中File IO缓存区图如下:
I/O就是对缓存区的读写,可以看出InputStream.read()实际就是调用底层的“read()",将内核缓冲区的数据写入到Java进程的缓冲区,而OutputStream.write()就是调用底层的"write()"将Java进程缓冲区的数据写入到内核缓冲区。I/O的意义就是缓存区之间的读写,进而对文件或流数据的读写。
阻塞、同步、异步
我们经常可以看到这些IO的定义: BIO是同步阻塞、NIO是同步非阻塞、AIO是异步非阻塞。那么什么是同步,什么异步,什么是阻塞。我觉得有广义和IO层面的两种定义,它们是不一样的。
广义上的阻塞和同步
阻塞:调用者(主线程或进程)调用一个阻塞方法,则调用者线程会一直运行该方法,直到这个方法运行完才会往下执行,当该方法很慢或线程挂起了,则调用者看起来就是被阻塞了。
非阻塞:调用者(主线程或进程)调用一个非阻塞方法,则该方法无需运行完所有逻辑就会返回状态值给调用者,调用者可以直接往下执行无需等待方法的所有逻辑完成。
同步:串行执行,调用者依次往下执行方法,上一个方法完成了才会执行下一个方法。
异步:并行执行,调用者调用方法,可以另开线程甚至进程执行,方法可以并行执行,下一个方法无需等待上个方法完成。
IO上的阻塞和同步
要理解IO定义的阻塞和同步,就得提底层原理和代码了。因为IO里的同步阻塞涉及到缓冲区的各阶段的运行。
缓冲区
缓冲区有非直接缓冲区和直接缓冲区,它们都是用来保存从磁盘读取的数据或要写入磁盘的数据。
非直接缓冲区:应用程序和内核读写的缓存区是在不同的内存块上,需要复制才能同步数据。具体图示如下:
由图可看出,Java等应用程序调用read方法,内核会读取磁盘或数据流到内核缓冲区上,然后内和缓冲区需要复制到Java等应用程序缓冲区,最后Java等应用程序代码才能通过应用缓冲区读取到数据。write方法类似。上面缓冲区到内存块是有个逻辑映射的,上图已省略。
直接缓冲区:应用程序与内核读写的缓存区是同一片物理内存。具体图示如下:
由图可看出,Java等应用程序调用read方法,内核会读取磁盘或数据流复制到缓冲区上,然后直接返回给应用程序,即应用程序和内核读取的缓冲区对应的物理内存块是同一个,即这个缓冲区既是内核缓冲区,也是用户应用缓冲区。write方法类似。上面缓冲区到内存块是有个逻辑映射的,上图已省略。
非直接缓冲区将内核与应用程序的物理缓冲区区分开来,操作系统内核避免了干扰也更安全,但读写效率低。直接缓存区虽然读写效率高,但不安全,用户程序可以影响操作系统内核。Java的IO能使用两种缓存区。但无论哪一种缓存区,都是先需要磁盘或网络数据流读取到缓冲区,缓冲区数据再读取给Java应用代码,写入也一样,需要经过缓冲区过渡。
系统调用
我们查看Java的InputStream.read()或OutputStream.write()方法,可以看到它是调用了native方法,实际最后是调用了操作系统的recvform等方法。
Java进程调用系统内核进程的recvform等方法,操作系统并不是立即就将数据写入缓冲区,而是会等待,比如等待TCP粘包,等待磁针移动到相应数据的扇区,等这些准备工作都完成了后,内核才会将数据复制到缓冲区,最后Java进程才能得到返回值获取缓冲区的数据。所以,I/O有两个步骤:准备(等待)、复制。
IO里的阻塞同步就是指这调用底层IO的这两个阶段的处理方式不同。
阻塞I/O
Java应用进程的I/O方法调用系统内核的recvfrom()等方法时,参数里声明了阻塞(用参数MSG_WAITALL和MSG_DONTWAIT控制),则系统内核在准备阶和复制阶段会一直阻塞Java进程,直到返回数据。
可以看出Java等应用程序在执行IO时,准备和复制阶段都是阻塞的。
非阻塞I/O
Java应用进程的I/O方法调用系统内核的recvfrom() 等方法时,参数设置为不阻塞,则为内核不会阻塞进程,等数据都准备好了,则调用recvfrom()进行阻塞复制。具体如下图:
可以看出Java等应用程序在执行IO时,准备阶段是非阻塞,可以轮询查询是否可读写状态,但在复制阶段是阻塞的。
所以,Java里I/O的阻塞和非阻塞区别就是IO在等待准备阶段是否是阻塞的。根本原因是调用操作系统的recvfrom() 等方法声明的阻塞参数不同。
同步I/O
以上应用程序的IO在准备阶段进行等待也好、轮询也好,多路复用IO也好、还有信号驱动IO也好,它们在复制阶段都是阻塞的。即应用程序仍然会被阻塞,需要等待完成完整的IO才能拿到结果往下执行,因此叫做同步IO。具体图示如下:
异步I/O
应用程序的IO在准备和复制阶段都不会阻塞,IO的结果可以用回调函数进行处理,IO和应用进程完全不干扰,因此被称为异步IO。具体图示如下:
所以,Java里I/O的同步和异步区别就是IO在复制阶段是否是阻塞的。根本原因是调用操作系统的IO方法不同。
BIO
BIO称为Block IO,即阻塞IO。它是同步阻塞的,即在准备和复制阶段都是阻塞的!Java应用在底层调用完了recvfrom()等方法一直阻塞等待缓冲区数据复制完成即可。
BIO网络编程代码如下:
package com.longqi.boottest.io;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LongQi
* @projectName boot-integration
* @description: TODO
* @date 2023/3/29 11:04
*/
public class BioApplication {
public static void main(String[] args) throws Exception{
ThreadPoolExecutor serverPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
new Thread(new Runnable() {
@Override
public void run() {
try {
ServerSocket server = new ServerSocket(8080);
while (true) {
// 每个客户端返回的socket是不一样的,这里的server.accept()需要等待客户端数据到服务器内核的缓冲区的IO完成。循环一次只能读取一个客户端数据。
Socket socket = server.accept();
// 线程池用线程去读取客户端传来的信息并打印,不至于1个客户端的传来的数据量巨大阻塞后续的客户端请求。
serverPool.submit(new Runnable() {
@Override
public void run() {
try {
InputStream in = socket.getInputStream();
char[] buffer = new char[1024];
StringBuilder strBuilder = new StringBuilder();
BufferedReader bufReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
int bytesRead = -1;
while ((bytesRead = bufReader.read(buffer)) > 0) {
strBuilder.append(buffer, 0, bytesRead);
}
System.out.println("收到客户端"+socket.getPort()+"的信息:"+strBuilder.toString());
} catch (Exception e) {
System.out.println("服务handler处理异常");
}
}
});
}
} catch (Exception e) {
System.out.println("服务器启动失败");
}
}
}).start();
Thread.sleep(2000);
ThreadPoolExecutor clientPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
for(int i=0;i<300;i++){
clientPool.submit(new Runnable() {
@Override
public void run() {
try{
Socket client = new Socket();
client.connect(new InetSocketAddress("127.0.0.1",8080));
client.getOutputStream().write(("hello world,send time:"+System.nanoTime()).getBytes());
// 注意TCP粘包,这里不调用close,服务器是收不到消息的
client.close();
}catch (Exception e){
System.out.println("客户端发送信息失败");
}
}
});
}
}
}
以上可以看出,每次server.accept(),需要等待内核把客户端数据写到缓冲区完成,每次while循环只能处理一个IO,为了使服务端处理请求互不影响且性能提高,这里使用线程读取缓冲区数据,进行后续处理。
NIO
BIO称为No-Block IO,即非阻塞IO。它是同步非阻塞的,即在准备阶段非阻塞,复制阶段阻塞!Java应用在底层调用完了recvfrom()等方法时传非阻塞参数,轮询获取状态,一直获取到可读可写状态才进入阻塞复制阶段对缓冲区进行复制读取或写入。针对于准备阶段的查询状态,NIO主要有三种方案:应用进程轮询、多路复用(系统内核轮询或事件驱动)、信号驱动。
应用进程轮询
即Java进程的IO线程轮询调用内核的非阻塞recvfrom()方法,获取到可读可写状态后才进入阻塞读写。该方案和BIO一样,有多少个IO就要建多少个线程,且轮询调用内核方法花费比较高(每次调用方法都要建立端口连接进行通信),且大量CPU时间被浪费,整体花费的成本比BIO还高!虽然这种方案对于应用IO线程来讲等待阶段是非阻塞的,但不会有哪个厂家使用这种方案来实现NIO。
多路复用
即系统内核提供函数轮询多个IO操作的状态或者通过事件驱动监控IO的状态,并将状态返回。这里不需要应用进程起那么多线程去轮询各个IO。比如同时有1000个IO操作,我们对比下应用进程轮询和多路复用:
应用进程轮询:需要应用进程起1000个IO线程,每个线程都要发起轮询去调用系统内核的recvfrom()方法,消耗的CPU和应用线程数量都是巨大的,且每个线程轮询请求返回的信息大部分是无用的。
多路复用:应用进程只需起1个IO线程,将这1000个IO操作都注册到系统内核的select/poll/epoll方法上,然后该线程轮询调用内核的select或poll方法。内核的select/poll会轮询它内部的1000个IO的状态返回,epoll会通过事件回调更新各IO状态直接将IO状态返回。这样,只需1个应用线程就可以监控1000个IO操作!应用线程大大减少,CPU也充分利用了,毕竟每次轮询可能有可读或可写状态返回,进而被应用进程利用进而发起阻塞复制操作。
注意:64位linux的select方法由于内部维护一个2048长度的数组(32位是1024),有2048个IO数量限制,即最多监控2048的IO操作,这个2048限制可以修改操作系统来调整容量,而poll方法由于内部维护一个链表,监控的IO数量是无限制的。epoll方法是通过事件机制直接更新缓存的IO状态,被调用时无需去轮询查询各IO的状态,可以直接将缓存的IO状态返回。另外,这些方法不仅可以返回状态,还可以进行读写缓存区操作,读写缓存区操作都是阻塞的。
select一般有1024或2048限制,poll和epoll理论是无限制的,但受操作系统进程的句柄限制。select和poll由于需要遍历N个IO才能得到状态,查询的时间复杂度是O(N),效率会随着IO个数增大而变慢,而epoll由于事件回调更新缓存的状态,查询的复杂度是O(1),基本不受IO个数影响。epoll由于大量连接下查询快,上限高(每增加1G内存可以多打开10万个连接),一般高并发最好选用能支持epoll的操作系统。
NIO网络编程如下:
package com.longqi.boottest.io;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LongQi
* @projectName boot-integration
* @description: TODO
* @date 2023/3/27 16:17
*/
public class NioApplication {
public static void main(String[] args) throws Exception{
ThreadPoolExecutor serverPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
// 设置服务器
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(8082));
Selector selector = Selector.open();
// 设置select监听的事件
server.register(selector, SelectionKey.OP_ACCEPT);
new Thread(new Runnable() {
@Override
public void run() {
try{
while (true){
// 轮询调用select方法,获取ACCEPT状态的IO流
int n = selector.select();
if(n == 0){
continue;
}
// 若有就绪状态的IO流(channel),则迭代读取消息。
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()){
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector,SelectionKey.OP_READ);
}
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
serverPool.submit(
new Runnable() {
@Override
public void run() {
try{
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer)!=-1){
//复位,转化为读模式
buffer.flip();
while (buffer.hasRemaining()){
System.out.println("收到客户端"+channel.socket().getPort()+"的信息:"+StandardCharsets.UTF_8.decode(buffer).toString());
}
//清空缓存区,转化为写模式
buffer.clear();
}
}catch (Exception e){
System.out.println("服务handler处理异常");
}
}
}
);
}
iterator.remove();
}
}
}catch (Exception e){
System.out.println("服务器异常");
}
}
}).start();
Thread.sleep(2000);
ThreadPoolExecutor clientPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
for(int i=0;i<300;i++){
clientPool.submit(new Runnable() {
@Override
public void run() {
try{
Socket client = new Socket();
client.connect(new InetSocketAddress("127.0.0.1",8082));
client.getOutputStream().write(("hello world,send time:"+System.nanoTime()).getBytes());
// 注意TCP粘包,这里不调用close,服务器是收不到消息的
client.close();
}catch (Exception e){
System.out.println("客户端发送信息失败");
}
}
});
}
}
}
可以看出selector.select()这个方法可以查询出多个IO的状态,相对于BIO的server.accept()只能阻塞等待一个IO好多了。且这个selector.select()方法不是阻塞的,会立即返回数据。当然,这里后续这里处理多个客户端的可读数据还是使用了线程池进行处理。
信号驱动
多路复用模型有个缺点,就是应用进程的IO线程还是要轮询调用内核select等方法。CPU依旧被大量占用,而信号驱动是应用进程的IO线程可以将多个IO事件注册到sigaction方法内,让内核监控这些IO,应用IO线程调用完这个sigaction方法就可以干别的事了,当这些IO有变化,则将这些IO的状态变化(SIGIO信号)发送给应用程序的回调函数,回调函数再对IO进行判断处理,若读取到就绪状态,则可以调用recvform函数进行阻塞复制。
虽然理论上信号驱动更好,但实际上SIGIO信号太频繁了,会造成回调应用程序非常频繁,甚至应用程序都没处理完上个信号,下个信号就来了,资源消耗反而比NIO更大,甚至会卡死。因此信号驱动目前基本没啥框架使用,Netty也是基于NIO的多路复用实现的。
AIO
AIO称为Asynchronous IO,即异步非阻塞IO。它是准备和复制阶段都是非阻塞的!Java应用进程调用系统内核的aio_read等方法对操作系统发起IO请求,之后应用进程就可以执行别的代码了,等系统内核自己完成复制数据到缓冲区后,内核会发送信号给应用程序的回调函数。在内核IO过程中,应用系统完全可以做别的事,无需任何阻塞。
虽然AIO对应用程序很美好,但对于内核来讲,实现IO就更复杂了,Linux底层使用epoll来实现AIO,但实现的效果并不是很理想。且加上JDK封装了AIO,Java使用AIO不易深度优化。
Java使用AIO,性能相对于NIO没有优势,另外处理回调函数速度跟不上处理IO速度,会导致缓冲区内存慢慢越积越多,AIO接收数据还必须先分配固定缓存,对于连接数量大流量小的情况,会浪费很多内存。因此,AIO虽然也支持大量连接,但更适合于连接比较长流量大的场景。所以一般的web框架都是使用NIO,比如Netty并没有使用AIO而是用NIO来处理高并发,springboot里的tomcat默认也是使用NIO启动。
总结
BIO和NIO都是复制阶段阻塞,但BIO等待阶段也是阻塞。而AIO在两个阶段都不阻塞。具体API实现方面,它们读取到客户端的数据后,其实都要用线程进行处理提高服务器性能。
BIO适用于连接数小的场景。NIO适合连接数多且连接短的架构,比如聊天服务器,普通web服务器等。AIO适用于连接数多且连接长的架构。