Netty基础—4.NIO的使用简介二
大纲
1.Buffer缓冲区
2.Channel通道
3.BIO编程
4.伪异步IO编程
5.改造程序以支持长连接
6.NIO三大核心组件
7.NIO服务端的创建流程
8.NIO客户端的创建流程
9.NIO优点总结
10.NIO问题总结
4.伪异步IO编程
(1)BIO的主要问题
(2)BIO编程模型的改进
(3)伪异步IO编程
(4)伪异步IO的问题
(5)伪异步IO可能引起的级联故障
(1)BIO的主要问题
BIO的主要问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理新接入的客户端链路,一个线程只能处理一个客户端连接。
比如在上述BIO编程的服务端程序中:SocketServer和一个客户端建立连接后,服务端会一直执行in.read(buf)进行阻塞等待,从而导致服务端只能和这个客户端进行通信。如果有一个新的客户端想和服务端建立连接,那么服务端是不能和这个新的客户端建立TCP连接的,因为服务端还在阻塞执行in.read(buf)而没能执行serverSocket.accept()。
(2)BIO编程模型的改进
为了改进一个线程处理一个连接的BIO模型,可以通过线程池或者消息队列,实现一个或者多个线程处理N个客户端的伪异步IO模型。
此时由于底层通信机制依然使用同步阻塞IO,所以被称为伪异步IO。服务端线程池的最大线程数N和客户端并发访问数M呈N : M的比例关系。改进后的BIO模型能有效防止海量并发接入导致线程资源耗尽。
(3)伪异步IO编程
当有新的客户端接入时,将客户端的Socket封装成一个任务Task(实现Runnable接口)投递到后端的线程池中进行处理。这个线程池中会维护一个消息队列和N个活跃的线程来对消息队列中的任务Task进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此线程资源的占用是可控的,不会导致线程耗尽而宕机。
//服务端(为简洁省略try catch)
public class SocketServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
ServerHandlerExecutePool singleExcutor = new ServerHandlerExecutePool(50, 1000);
while (true) {
//在这里会阻塞住,一直等待客户端来建立连接
Socket socket = serverSocket.accept();
//获取客户端的连接后,将socket提交给线程池处理
singleExcutor.execute(new ServerHandler(socket));
}
...
}
}
public class ServerHandlerExecutePool {
private ExecutorService executor;
public ServerHandlerExecutePool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize)
);
}
public void execute(Runnable task) {
executor.execute(task);
}
}
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStreamReader in = new InputStreamReader(socket.getInputStream());
OutputStream out = socket.getOutputStream();
char[] buf = new char[1024 * 1024];
int len = in.read(buf);
while (len != -1) {
String request = new String(buf, 0, len);
System.out.println("[" + Thread.currentThread().getName() + "]服务端接收到了请求:" + request);
out.write("收到,收到".getBytes());
len = in.read(buf);
}
//释放资源
out.close();
in.close();
//通过TCP四次挥手断开连接
socket.close();
}
}
(4)伪异步IO的问题
一.当对Socket的输入流进行读操作时
也就是调用Java输入流InputStream的read()方法读输入流时,会一直阻塞下去直到可用数据已经读取完毕。这意味着当一方发送请求或者应答消息比较缓慢或者网络传输比较缓慢时,读取输入流的一方的通信线程将被长时间阻塞。如果发送方要60s才能够将数据发送完毕,读取方的IO线程在执行read()方法时会被同步阻塞60s。在此期间,其他的连接任务只能在消息队列中排队。
二.当对Socket的输出流进行写操作时
也就是调用Java输出流OutputStream的write()方法写输出流时,也会一直阻塞下去直到所有要发送的字节全部写入完毕。当消息的接收方处理缓慢时,将不能及时从TCP缓冲区中读取数据。这将会导致发送方的TCP Window Size不断减小,直到为0。最后消息发送方将不能再向TCP缓冲区写入消息,于是write()方法将被阻塞直到TCP Window Size大于0。所以输入流和输出流的读和写操作都是同步阻塞的,阻塞时间取决于对方IO线程的处理速度和网络IO的传输速度。
(5)伪异步IO可能引起的级联故障
一.服务端处理缓慢,返回应答消息耗费60s,平时只需10ms。
二.此时客户端在读取服务端响应,由于读取输入流是阻塞的,客户端将会被同步阻塞60s。
三.假如所有可用线程都被该服务端阻塞,那后续的所有IO消息都将在队列中排队。
四.由于线程池采用阻塞队列实现,当队列积满后,后续入队的操作将会被阻塞。
五.由于只有一个Acceptor线程接收客户端接入,线程池的阻塞队列满了之后,它也会被阻塞。于是新的客户端请求消息将会被拒绝,从而客户端发生大量的连接超时。
因此BIO模型显然无法支持百万并发请求,因为一个线程只能处理一个请求。改进BIO后的伪异步IO模型即便可以通过一个线程处理多个请求,也存在级联故障的问题。
5.改造程序以支持长连接
(1)什么是长连接和短连接
(2)长连接和短连接的代码
(1)什么是长连接和短连接
一.短连接
客户端每次建立一个连接,发送一个请求,获取一个响应,然后就断开连接,就是所谓的短连接。
二.长连接
客户端每次建立一个连接,可以发送很多个请求,一直持续维持这个TCP连接,不断开连接。客户端持续通过这个连接与服务端进行通信,不停地发送数据和请求。服务端也长期维持这个连接,不停地接受请求返回响应。这个就是所谓的长连接,连接存在的时间很长的。所以只要客户端不停地发送请求不释放连接,那么就是长连接了。
(2)长连接和短连接的代码
//客户端(短连接)(为简洁省略try catch)
public class SocketClient {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
Socket socket = new Socket("localhost", 9000);
InputStreamReader in = new InputStreamReader(socket.getInputStream());
OutputStream out = socket.getOutputStream();
//发送数据流,底层拆分为一个一个的TCP包发过去
out.write("你好".getBytes());
char[] buf = new char[1024 * 1024];
int len = in.read(buf);
if (len != -1) {
String response = new String(buf, 0, len);
System.out.println("[" + Thread.currentThread().getName() + "]客户端接收到了响应:" + response);
}
in.close();
out.close();
socket.close();
};
}.start();
}
}
}
//客户端(长连接)(为简洁省略try catch)
public class SocketClient {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
Socket socket = new Socket("localhost", 9000);
InputStreamReader in = new InputStreamReader(socket.getInputStream());
OutputStream out = socket.getOutputStream();
char[] buf = new char[1024 * 1024];
//客户端不停地每隔一秒发送请求,不释放就是长连接了
while (true) {
//发送数据流,底层拆分为一个一个的TCP包发过去
out.write("你好".getBytes());
int len = in.read(buf);
if (len != -1) {
String response = new String(buf, 0, len);
System.out.println("[" + Thread.currentThread().getName() + "]客户端接收到了响应:" + response);
}
Thread.sleep(1000);
}
...
};
}.start();
}
}
}
6.NIO三大核心组件
(1)Buffer缓冲区
(2)Channel数据通道
(3)Selector多路复用器
(4)BIO和IO多路复用的理解
(5)一些概念补充
NIO的三大核心组件分别是:Buffer、Channel、Selector。
(1)Buffer缓冲区
Buffer缓冲区是用来封装数据的,也就是把数据写入到Buffer、或者从Buffer中读取数据。
(2)Channel数据通道
Channel就是一个数据管道,负责传输数据(封装好数据的Buffer),比如把数据写入到文件或网络、从文件或网络中读取数据。
(3)Selector多路复用器
Selector会不断地轮询注册在其上的Channel。如果某个Channel上发生读或写事件,那么这个Channel就处于就绪状态。然后就绪的Channel就会被Selector轮询出来,具体就是通过Selector的SelectionKey来获取就绪的Channel集合。获取到就绪的Channel后,就可以进行后续的IO操作了。
一个Selector多路复用器可以同时轮询多个Channel。由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这意味着只需要一个线程负责Selector多路复用器的轮询,就可以接入成千上万的客户端。
(4)BIO和IO多路复用的理解
由于TCP连接的建立需要经过三次握手,所以可理解为客户端向服务端发起的Socket连接就绪需要经过三次握手请求。服务端接收到客户端的第一次握手请求时,会创建Socket连接(即创建一个Channel)。服务端接收客户端的第三次握手请求,这个Socket连接才算准备好(Channel才算就绪)。
在BIO模型下,只有一次系统调用recvfrom。所以服务端从接收到客户端的第一次握手请求开始,就必须同步阻塞等待直到第三次握手请求的完成,这样才能获取准备好的Socket连接并读取请求数据。
在多路复用模型下,会有两次系统调用select和recvfrom。所以服务端接收到客户端的第一次握手请求后,不必创建线程然后通过阻塞来等待第三次握手请求的完成,而是可以直接通过轮询Selector(基于select系统调用),来获取所有已经完成第三次握手请求(已就绪)的客户端Socket连接,之后对这些Socket连接进行recvfrom系统调用时不需要阻塞也能马上读取到请求数据了。
(5)一些概念补充
一.异步非阻塞IO
NIO类库支持非阻塞的读和写操作。相比BIO同步阻塞的读和写操作,NIO是异步的,所以也称NIO为异步非阻塞IO。
二.多路复用Selector
NIO的实现关键是多路复用IO技术。多路复用的核心是通过Selector来轮询注册在其上的Channel,执行Selector.select()方法进行轮询时是同步阻塞的。当发现有Channel处于就绪状态后,Selector.select()方法就会从阻塞状态中返回,然后就可以通过Selector.selectedKeys()方法获取就绪的Channel进行IO操作。
三.伪异步IO
在通信线程和业务线程之间做一个缓冲区,这个缓冲区用于隔离IO线程和业务线程间的直接访问,这样业务线程就不会被IO线程阻塞了。
对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不再直接访问IO线程或者进行IO读写,这样也就不会被同步阻塞。
7.NIO服务端的创建流程
步骤一:通过ServerSocketChannel的opne()方法打开ServerSocketChannel。
步骤二:设置ServerSocketChannel为非阻塞模式,绑定监听地址和端口。
步骤三:通过Selector的open()方法创建多路复用器Selector,将已打开的ServerSocketChannel注册到多路复用器Selector上以及监听ACCEPT事件。
步骤四:多路复用器Selector通过select()方法轮询准备就绪的SelectionKey。
步骤五:如果这个SelectionKey是acceptable,说明有客户端发起了连接请求。此时需要调用ServerSocketChannel的accept()方法来处理该接入请求,也就是完成TCP三次握手并建立物理链路以及得到该客户端连接SocketChannel。
步骤六:然后将新接入的客户端连接SocketChannel注册到多路复用器Selector上以及监听READ事件。
步骤七:如果这个SelectionKey是readable,说明有客户端发送了数据过来。此时需要调用SocketChannel的read()方法异步读取客户端发送的数据到ByteBuffer缓冲区,同时将客户端连接SocketChannel注册到多路复用器Selector上以及监听WRITE事件。
步骤八:接着对ByteBuffer缓冲区的数据进行decode解码处理并完成业务逻辑,然后再将处理结果对象encode编码放入ByteBuffer缓冲区,最后调用SocketChannel的write()方法异步发送给客户端,以及将客户端连接SocketChannel注册到多路复用器Selector上以及监听READ事件。
public class NIOServer {
private static Selector selector;
public static void main(String[] args) {
init();
listen();
}
private static void init() {
//serverSocketChannel其实就是ServerSocket,负责跟各个客户端连接连接请求
//SelectionKey.OP_ACCEPT的意思是仅仅关注ServerSocketChannel接收到的TCP连接请求,也就是监听ACCEPT事件
//ServerSocketChannel是注册在Selector上面的
ServerSocketChannel serverSocketChannel = null;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();//步骤一:打开ServerSocketChannel
serverSocketChannel.configureBlocking(false);//步骤二:NIO就是支持非阻塞的
serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100);//步骤二
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//步骤三:注册到selector + 关注OP_ACCEPT事件
} catch (IOException e) {
e.printStackTrace();
}
}
private static void listen() {
while(true) {
try {
//select()方法是阻塞的,看注册到Selector上的ServerSocketChannel是否接收到了请求
selector.select();//步骤四
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while(keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
//可以认为一个SelectionKey代表了一个请求
keysIterator.remove();
handleRequest(key);
}
} catch(Throwable t){
t.printStackTrace();
}
}
}
private static void handleRequest(SelectionKey key) throws IOException, ClosedChannelException {
//会有个线程池获取到了请求,下面的代码都应该在线程池的工作线程里处理
SocketChannel channel = null;
try {
//如果这个SelectionKey是acceptable,也就是连接请求
//那么注册对应的SocketChannel到selector上,并关注OP_READ事件
if (key.isAcceptable()) {//步骤五
System.out.println("[" + Thread.currentThread().getName() + "]接收到连接请求");
//从SelectionKey中拿出ServerSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//调用ServerSocketChannel的accept方法,就可以跟客户端进行TCP三次握手
channel = serverSocketChannel.accept();
System.out.println("[" + Thread.currentThread().getName() + "]建立连接时获取到的channel=" + channel);
//如果三次握手成功了之后,就可以获取到一个建立好TCP连接的SocketChannel
//这个SocketChannel大概可以理解为,底层有一个Socket是跟客户端进行连接的
//这个SocketChannel就是联通到那个Socket上去,负责进行网络数据的读写的
//下面配置成非阻塞的
channel.configureBlocking(false);
//将该SocketChannel注册到selector上,且仅仅关注READ请求,也就是关注发送数据过来的请求
channel.register(selector, SelectionKey.OP_READ);//步骤六:注册到selector + 关注OP_READ事件
} else if (key.isReadable()) {//步骤七
//如果这个SelectionKey是readable,也就是发送了数据过来,此时需要读取客户端发送过来的数据
channel = (SocketChannel) key.channel();
//读取请求数据的buffer缓冲
ByteBuffer buffer = ByteBuffer.allocate(1024);
//通过底层的socket读取数据,写入buffer中,position可能就会变成比如21之类的
//socket读取到了多少个字节,此时buffer的position就会变成多少
int count = channel.read(buffer);//步骤七
System.out.println("[" + Thread.currentThread().getName() + "]接收到请求");
if (count > 0) {
//设置position = 0,limit = 21,仅仅读取buffer中,0~21这段刚刚写入进去的数据
buffer.flip();
System.out.println("服务端接收请求:" + new String(buffer.array(), 0, count));
channel.register(selector, SelectionKey.OP_WRITE);//步骤七:注册到selector + 关注OP_WRITE事件
}
} else if (key.isWritable()) {//步骤八
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("Server收到了".getBytes());
buffer.flip();
channel = (SocketChannel) key.channel();
channel.write(buffer);//步骤八
channel.register(selector, SelectionKey.OP_READ);//步骤八:注册到selector + 关注OP_READ事件
}
} catch(Throwable t) {
t.printStackTrace();
if (channel != null) {
channel.close();
}
}
}
}
8.NIO客户端的创建流程
步骤一:通过SocketChannel的open()方法打开SocketChannel。
步骤二:设置SocketChannel为非阻塞模式,同时设置TCP参数。
步骤三:调用SocketChannel的connect()方法异步连接服务端,完成TCP三次握手并建立物理链路。
步骤四:通过Selector的open()方法创建多路复用器Selector,并将已打开的SocketChannel注册到多路复用器Selector上以及监听CONNECT事件。
步骤五:多路复用器Selector通过select()方法轮询准备就绪的SelectionKey。
步骤六:如果这个SelectionKey是connectable,说明服务端接受了发起的连接请求,于是将SocketChannel注册到多路复用器Selector上以及监听READ事件。
步骤七:如果这个SelectionKey是readable,说明服务端返回了数据。于是调用SocketChannel的read()方法异步读取服务端返回的数据到ByteBuffer缓冲区,同时将客户端连接SocketChannel注册到多路复用器Selector上以及监听WRITE事件。
步骤八:接着对ByteBuffer缓冲区的数据进行decode解码处理并完成业务逻辑,然后再将处理结果对象encode编码放入ByteBuffer缓冲区,最后调用SocketChannel的write()方法异步发送给客户端,以及将客户端连接SocketChannel注册到多路复用器Selector上以及监听READ事件。
public class NIOClient {
public static void main(String[] args) {
//启动10个线程去和服务端建立通信
for (int i = 0; i < 10; i++) {
new Worker().start();
}
}
static class Worker extends Thread {
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
//SocketChannel底层就是封装了一个Socket
//SocketChannel是通过底层的Socket网络连接上服务端的数据通道,负责基于网络读写数据
channel = SocketChannel.open();//步骤一
//配置成非阻塞的
channel.configureBlocking(false);//步骤二
//底层会发起了一个TCP三次握手尝试建立连接
channel.connect(new InetSocketAddress("localhost", 9000));//步骤三
selector = Selector.open();
//监听connect行为
channel.register(selector, SelectionKey.OP_CONNECT);//步骤四:注册到selector + 关注OP_CONNECT事件
while(true) {
//三次握手成功后,服务端会给客户端返回一个响应请求,通过如下代码把响应请求拿到
selector.select();//步骤五
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while(keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
//如果server返回的是一个connectable的消息
if (key.isConnectable()) {//步骤六
channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
//一旦建立连接成功了以后,此时就可以给server发送一个数据了
channel.finishConnect();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("你好".getBytes());
buffer.flip();
channel.write(buffer);
}
//接下来监听READ事件就是准备读服务端返回的数据
channel.register(selector, SelectionKey.OP_READ);//步骤六:注册到selector + 关注OP_READ事件
} else if (key.isReadable()) {//步骤七:说明服务器端返回了一条数据可以读了
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);//把数据写入buffer,position推进到读取的字节数数字
if (len > 0) {
System.out.println("[" + Thread.currentThread().getName() + "]收到响应:" + new String(buffer.array(), 0, len));
Thread.sleep(5000);
//睡眠5秒后,接下来继续监听WRITE事件,并准备写数据给服务端
channel.register(selector, SelectionKey.OP_WRITE);//步骤七:注册到selector + 关注OP_WRITE事件
}
} else if (key.isWritable()) {//步骤八
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("重复你好了".getBytes());
buffer.flip();
channel = (SocketChannel) key.channel();
channel.write(buffer);
//再次重复发数据给服务端后,接下来要监听READ事件就是准备读服务端返回的数据
channel.register(selector, SelectionKey.OP_READ);//步骤八:注册到selector + 关注OP_READ事件
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
9.NIO优点总结
优点一:SocketChannel的连接操作是异步的
也就是客户端发起的连接操作SocketChannel.connect()是异步的。可以将SocketChannel注册到多路复用器上并关注OP_CONNECT事件等待后续结果,不需要像BIO的客户端那样被同步阻塞。
优点二:SocketChannel的读写操作都是异步的
也就是客户端发起的读写操作SocketChannel.read()和write()是异步的。如果没有可读写的数据它不会同步等待,而会直接返回。这样IO通信链路就可以处理其他链路了,不需要同步等待这个链路可用。
优点三:优化了线程模型
由于JDK的Selector在Linux等主流操作系统上通过epoll实现,从而没有了连接句柄数的限制。这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随客户端增加而线性下降。注意:Selector.select()是同步阻塞的。
优点四:优化了IO读写
BIO的读写是面向流的,一次性只能从流中读取一子节或多字节,而且读完后流无法再读取,需要自己缓存数据。NIO的读写是面向Buffer的,可随意读取里面任何字节的数据。不需要自己缓存数据,只需要移动读写指针即可。
10.NIO问题总结
问题一:NIO的类库和API繁杂,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
问题二:处理常见问题的工作量和难度比较大,比如客户断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流等。
问题三:NIO的epoll bug会导致Selector空轮询,最终导致CPU达到100%。