Netty基础—4.NIO的使用简介一
大纲
1.Buffer缓冲区
2.Channel通道
3.BIO编程
4.伪异步IO编程
5.改造程序以支持长连接
6.NIO三大核心组件
7.NIO服务端的创建流程
8.NIO客户端的创建流程
9.NIO优点总结
10.NIO问题总结
1.Buffer缓冲区
(1)Buffer缓冲区的作用
(2)Buffer缓冲区的4个核心概念
(3)使用Direct模式创建的Buffer缓冲区
(4)如何分配和读写一个Buffer缓冲区
(5)如何操作一个分配好的Buffer缓冲区
(1)Buffer缓冲区的作用
在NIO中,所有的数据都是通过使用Buffer缓冲区来处理的。如果要通过NIO,将数据写到文件和网络或从文件和网络中读取数据,那么就需要使用Buffer缓冲区来进行处理。
(2)Buffer缓冲区的4个核心概念
Buffer缓冲区本质上是一个数组。通常它是一个字节数组ByteBuffer,当然还有其他基本类型的数组如CharBuffer。Buffer缓冲区不仅仅是一个数组,还可以对数据进行访问和对读写位置进行维护。所以Buffer缓冲区引入了4个核心概念:capacity、limit、position、mark。
一.capacity表示缓冲区的容量大小
也就是Buffer缓冲区里包含的数据的大小。如下所示,用字节数组封装了一个ByteBuffer,可以看其capacity是多少。
byte[] data = new byte[]{1, 2, 3};
ByteBuffer buffer = ByteBuffer.wrap(data);
System.out.println(buffer.capacity());
//输出显示ByteBuffer的capacity是3,因为字节数组data的大小是3
二.limit表示对Buffer缓冲区使用的一个限制
limit可以限制最多可以读取多少容量为capacity的缓冲区的数据。默认limit = capacity,limit后的位置不能读写数据。比如上述例子的3个数据,可用如下代码限制只能读取到index = 1的位置。
buffer.limit(1);
System.out.println(buffer.limit());
三.position表示数组中可以读写的位置
position不能大于limit。它会随着读写一直自动推进,直到跟limit一样才不能读写。如果手动设置的position大于limit,那么自动把limit设置为position。此外,remaining表示position到limit之间的距离。
四.mark表示对当前position位置的标记
在某个position的时候,设置一下mark,此时就可以设置一个标记。后续调用reset()方法可以把position复位到当时设置的那个mark位置上。把position或limit调整为小于mark的值时,就会丢弃这个mark。
总结:Buffer缓冲区就是一个数据缓冲区,可以支持不同的数据类型。比如ByteBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、IntBuffer,这些不同的Buffer里面就可以包裹不同基本类型的数组。每个Buffer类都是Buffer接口的子实例,大多数标准IO操作都使用ByteBuffer。
Buffer缓冲区还引入了capacity、limit、position、mark等概念,通过这些概念可以对Buffer缓冲区里的数据进行访问以及维护读写位置等信息。比如可通过limit、position、mark来限制能读那些数据,从哪里开始读。
下面是一个使用Buffer缓冲区的示例:
public class BufferDemo {
public static void main(String[] args) throws Exception {
byte[] data = new byte[] {55, 56, 57, 58, 59};
ByteBuffer buffer = ByteBuffer.wrap(data);
System.out.println(buffer.capacity());
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.get());//把当前position所在位置的数据读取一位出来
System.out.println(buffer.position());
buffer.mark();//在position = 1的时候打的mark,标记
//buffer.position(3);
//buffer.limit(4);
buffer.position(3);
System.out.println(buffer.get());
System.out.println(buffer.position());
buffer.reset();
System.out.println(buffer.position());
}
}
(3)使用Direct模式创建的Buffer缓冲区
有个缓冲区是比较特殊的,叫做Direct模式的缓冲区,它的整体性能比普通模式的缓冲区要高些。
//适用于从磁盘文件读数据出来,或者从网络里读数据进来
ByteBuffer buffer1 = ByteBuffer.allocate(10);
//如果用direct模式分配Buffer,整体性能可以比普通模式稍微高些
ByteBuffer buffer2 = ByteBuffer.allocateDirect(10);
(4)如何分配和读写一个Buffer缓冲区
一.ByteBuffer.allocateDirect(100)
分配一个Direct缓冲区,效率更高。
二.ByteBuffer.wrap(byte[] array)
把一个字节数组作为数据放到缓冲区。
三.ByteBuffer.put(byte b) 和 ByteBuffer.get()
对当前position位置放入一个字节数据,或者读取一个字节数据。
四.ByteBuffer.put(byte[] src, int offset, int length) 和 ByteBuffer.get(byte[] dst, int offset, int length)
把指定src数组里的一段数据写入缓冲区,或者是从缓冲区里读取数据到数组中。
五.ByteBuffer.put(byte[] src) 和 ByteBuffer.get(byte[] dst)
把数组全部写入缓冲区,以及从缓冲区读取全部数据到数组里去。
下面是这些API方法的使用示例:
public class BufferDemo2 {
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocateDirect(100);
System.out.println("position=" + buffer.position());
System.out.println("capacity=" + buffer.position());
System.out.println("limit=" + buffer.limit());
byte[] src = new byte[] {1, 2, 3, 4, 5};
buffer.put(src);
System.out.println("position=" + buffer.position());//position = 0~4,都填充了数据
//此时position = 5,如果直接读取数据,是读不到的,会发现是空的、没有数据
//所以需要手动操作一下position,调整到position = 0的位置,才能读到数据大小为5的字节数组src
buffer.position(0);
byte[] dst = new byte[5];
buffer.get(dst);
System.out.println("position=" + buffer.position());
System.out.print("dst=[");
for (int i = 0; i < dst.length; i++) {
System.out.print(i);
if (i < dst.length - 1) {
System.out.print(",");
}
}
System.out.print("]");
}
}
(5)如何操作一个分配好的Buffer缓冲区
一.ByteBuffer.clear()
还原缓冲区的状态:position设置为0、limit设置为capacity、丢弃mark。
但是本质并不是删除数据,只是还原了那些标记位而已。
还原之后就可以复用缓冲区里的空间,覆盖老的数据了。
二.ByteBuffer.flip()
准备读取刚写入的数据,就是将limit设置为当前position、将position设置为0、丢弃mark。
一般在写入完数据到缓冲区后,准备从位置=0开始读这段数据时,就可以使用flip。
三.ByteBuffer.rewind()
将position设置为0、并且丢弃mark。
一般在读取了一遍数据后,还想要再次重新读取一遍数据时,就可以使用rewind,此时limit是不变的。
一般比较少会遇到直接操作Buffer的position、limit和mark的场景,通常都会使用上述操作Buffer缓冲区的几个方法:首先执行put()方法往缓冲区里写入数据,然后打算复用时,就执行clear()方法再重新写数据。如果写了一段数据打算要读取数据了,就执行flip()方法。如果希望重新读取一遍数据,就执行rewind()方法。
2.Channel通道
(1)NIO编程中的Channel是什么
(2)NIO编程中Buffer和Channel的关系
(3)基于FileChannel将数据写入磁盘文件
(4)利用FileChannel实现顺序写和随机写
(5)FileChannel是多线程并发安全的
(6)从磁盘文件读取数据到Buffer缓冲区
(7)对文件上共享锁限制文件只读
(8)FileChannel的强制刷盘
(9)FileChannle的position
(1)NIO编程中的Channel是什么
Channel是一个通道,网络数据通过Channel读取和写入。通道可以用于读、写或者二者同时进行。通道与流的区别在于:通道是双向的,流是单向的,一个流必须是InputStream和OutputStream的子类。
因为Channel是全双工的,所以它可以比流更好映射底层操作系统的API。在UNIX网络编程中,底层操作系统的通道都是全双工的,同时支持读写操作。
Channel可以分为两大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel,其中ServerSocketChannel和SocketChannel都是SelectableChannel的子类。
(2)NIO编程中Buffer和Channel的关系
(3)基于FileChannel将数据写入磁盘文件
public class FileChannelDemo {
public static void main(String[] args) throws Exception {
//构造一个传统的文件输出流
FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
//通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
FileChannel channel = out.getChannel();
ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);
channel.close();
out.close();
}
}
(4)利用FileChannel实现顺序写和随机写
顺序写磁盘文件,就是不停地在文件末尾追加数据。
一.顺序写的例子
public class FileChannelDemo {
public static void main(String[] args) throws Exception {
//构造一个传统的文件输出流
FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
//通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
FileChannel channel = out.getChannel();
ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);
//channel必然会从buffer的position = 0的位置开始读起,一直读到limit,limit = 字符串字节数组的长度
//buffer的position此时就已经变成了跟limit一样了
System.out.println(buffer.position());//输出11
System.out.println(channel.position());//输出11,当前写到了文件的哪一个位置
//继续往磁盘文件里写,就是从FileChannel的position = 11开始写,相当于文件末尾追加
//如果想再次将buffer里的数据通过channel写入磁盘文件末尾,就是顺序写
buffer.rewind();//position = 0,重新读一遍
channel.write(buffer);//在文件末尾追加写的方式,顺序写,写完后文件内容就是"hello worldhello world"
channel.close();
out.close();
}
}
二.随机写的例子
public class FileChannelDemo {
public static void main(String[] args) throws Exception {
//构造一个传统的文件输出流
FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
//通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
FileChannel channel = out.getChannel();
ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);
//channel必然会从buffer的position = 0的位置开始读起,一直读到limit,limit = 字符串字节数组的长度
//buffer的position此时就已经变成了跟limit一样了
System.out.println(buffer.position());//输出11
System.out.println(channel.position());//输出11,当前写到了文件的哪一个位置
//如果继续往磁盘文件里写,就是从FileChannel的position = 11开始写,相当于文件末尾追加
//现在已经在文件写入hello world,要继续在hello和world中间的那个空格地方,再写入一条数据,比如hello world
//把这一段数据插入到磁盘文件的中间,就是磁盘随机写
//在文件的随机的位置写入数据,肯定是要再次从buffer中读取数据,所以position必须复位
buffer.rewind();
//其次如果你要基于FileChannel随机写,可以调整FileChannel的position,这样文件内容为"hellohello world"
channel.position(5);
channel.write(buffer);
channel.close();
out.close();
}
}
(5)FileChannel是多线程并发安全的
多线程并发写FileChannel是线程安全的。一个线程先执行完了写文件,下一个线程才能有机会去写,不可能多个线程同时基于一个FileChannel来写的。
public class FileChannelDemo2 {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
//构造一个传统的文件输出流
FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
//通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
final FileChannel channel = out.getChannel();
//输出文件的内容是10个顺序的hello world
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
}
}
//当然这里的out并没有关闭
}
(6)从磁盘文件读取数据到Buffer缓冲区
通过FileChannel + FileInputStream可以实现从磁盘文件读取数据到缓冲区。
public class FileChannelDemo3 {
public static void main(String[] args) throws Exception {
FileInputStream in = new FileInputStream("/Users/demo/Downloads/hello.txt");
FileChannel channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocateDirect(16);
channel.read(buffer);//读数据写入buffer,所以写完以后,buffer的position = 16
buffer.flip();//position = 0,limit = 16
byte[] data = new byte[16];
buffer.get(data);
System.out.println(new String(data));
channel.close();
in.close();
}
}
(7)对文件上共享锁限制文件只读
JVM内的多个线程对同一个FileChannel进行写,是线程安全的。但如果是多个JVM对同一个FileChannel进行写,则不是线程安全。
为此,FileChannel提供了文件锁功能,可以对文件上锁。FileChannel的文件锁分为共享锁和独占锁。
有线程先获取了共享锁读文件,其他JVM里的线程就不能获取独占锁写文件。有线程先获取了独占锁写文件,其他JVM里的线程就不能获取共享锁读文件。
如果对文件上共享锁,不同JVM的不同线程都可以同时获取共享锁读文件,除非已有某个JVM的某线程获取到了独占锁写文件。
一.先运行如下的FileLockDemo1程序启动一个JVM对文件加共享锁
public class FileLockDemo1 {
public static void main(String[] args) throws Exception {
//新建一个输入流
FileInputStream in = new FileInputStream("/Users/demo/Downloads/hello.txt");
FileChannel channel = in.getChannel();
//注意输入流是不能加独占锁的,否则会报错
channel.lock(0, Integer.MAX_VALUE, true);//加共享锁
Thread.sleep(60 * 60 * 1000);
channel.close();
in.close();
}
}
二.再运行如下的FileLockDemo2程序启动另一个JVM
由于该程序对文件加的是独占锁,所以此时会报错,只能加共享锁。
public class FileLockDemo2 {
public static void main(String[] args) throws Exception {
RandomAccessFile file = new RandomAccessFile("/Users/demo/Downloads/hello.txt", "rw");
FileChannel channel = file.getChannel();
//加独占锁,阻塞住,会等待别人释放共享锁了,这里才能成功加独占锁
channel.lock(0, Integer.MAX_VALUE, false);
System.out.println("加锁成功");
Thread.sleep(60 * 60 * 1000);
channel.close();
file.close();
}
}
(8)FileChannel的强制刷盘
通过FileChannel写数据到磁盘文件时,不会立即将数据写到磁盘上,而会先将数据写到操作系统自己管理的一个内存区域OS Cache上。这样处理的好处是可以让磁盘写的性能比较高,但如果此时系统宕机,那么OS Cache里的数据可能就会丢失。
Kafka和ElasticSearch将数据写到磁盘时,都是先写入OS Cache的,操作系统会定时把OS Cache里的数据写入到磁盘上。
如果希望数据可以马上写入磁盘,就不能只用FileChannel.write()方法,还需要通过FileChannel.force()方法将数据从OS Cache强制刷入磁盘。当然,FileChannel将数据强制刷盘会让磁盘写的性能降低。
public class FileLockDemo1 {
public static void main(String[] args) throws Exception {
//新建一个输入流
FileInputStream in = new FileInputStream("/Users/demo/Downloads/hello.txt");
FileChannel channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);
channel.force(true);//强制数据从OS Cache刷入磁盘,避免停留在OS Cache中
channel.close();
in.close();
}
}
(9)FileChannel的position
FileChannel有一个position的概念,它相当于一个指针,指向了当前文件的某个位置。
读数据是从FileChannel当前的position开始读的,每次刚开始初始化一个FileChannel后,position = 0。如果要随机读取文件里的某个位置,那么直接设置和定位FileChannel的position,就可以限定从什么位置开始读。
写数据也是从FileChannel当前的position开始写的,刚开始position = 0。如果直接写,那么可能会覆盖原来的老数据。读了多少字节的数据或写了多少字节的数据,FileChannel的position就会往前移动多少位。
3.BIO编程
(1)网络编程的Client/Server模型
(2)BIO服务端通信模型
(3)BIO网络编程的服务端
(4)BIO网络编程的客户端
(1)网络编程的Client/Server模型
网络编程的基本模型是Client/Server模型。客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接。如果连接建立成功,双方就可以通过网络套接字Socket进行通信。
在基于传统同步阻塞模型(BIO)开发中:ServerSocket负责绑定IP地址、启动监听端口,Socket负责发起连接操作。在连接成功后,双方通过输入和输出流进行同步阻塞通信。
(2)BIO服务端通信模型
通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求后会为每个客户端创建一个新的线程进行业务处理。业务处理完成后,通过输出流返回应答给客户端,然后销毁线程。这是典型的一请求一应答通信模型。
(3)BIO网络编程的服务端
//服务端(为简洁省略try catch)
public class SocketServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
Socket socket = serverSocket.accept();//在这里会阻塞住,一直等待客户端建立连接
//如果有个客户端要和当前服务端尝试建立TCP连接,并基于TCP协议来传输数据,发送一个一个的TCP包
//那么客户端必须先和当前服务端的端口号建立连接
//客户端Socket和服务端ServerSocket互相传递3次握手的TCP包,TCP连接就建立了
//当客户端向当前服务端发起TCP三次握手建立一个连接后,这里就会构建出一个Socket
//这个Socket就代表了当前服务端和某个客户端的一个TCP连接(Socket连接)
//当建立了TCP连接后,客户端就会通过IO流的方式发送数据过来
//由于IO流是无限的流,所以底层的TCP协议会把流式的数据拆分为一个一个的TCP包
//然后将TCP包包裹在IP包里,IP包又会被包裹在以太网包里
//最后通过底层的网络硬件设备以及以太网的协议,发送以太网包的数据
//获取Socket的输入流
InputStreamReader in = new InputStreamReader(socket.getInputStream());
//获取Socket的输出流
OutputStream out = socket.getOutputStream();
char[] buf = new char[1024 * 1024];//JVM的一个缓冲数组
int len = in.read(buf);
//Socket的输入流,相当于不停读取客户端通过TCP协议发送过的一个一个的TCP包
//然后把TCP包里的数据通过IO输入流的方式提供给服务端
//这样服务端就可以通过IO输入流读取的方式,把TCP包里的数据读出来,然后放入JVM内存的一个缓冲数组中
while (len != -1) {
String request = new String(buf, 0, len);
System.out.println("服务端接收到了请求:" + request);
//输出流的意思是,服务端会通过IO流发送响应数据回客户端
//此时在底层会把服务端的响应数据拆分为一个一个的TCP包,回传给客户端
//这样客户端就可以接收到服务端发送的TCP包了
out.write("收到,收到".getBytes());
//in.read会阻塞在这里尝试读取数据,所以out.write要放在前面
len = in.read(buf);
//为什么需要通过while循环反复去读取Socket流传输过来的数据?
//因为客户端是不停地用流的方式发送数据给服务端的,所以服务端需要不停地读取
//此外,由于buf才1KB,如果客户端发送过来的数据是几十KB,
//那么当服务端读取完1KB的数据后,还需要继续读取几十KB的数据,
//因此才需要服务端不停的读取
}
//释放资源
out.close();
in.close();
//通过TCP四次挥手断开连接
socket.close();
serverSocket.close();
}
}
(4)BIO网络编程的客户端
//客户端(为简洁省略try catch)
public class SocketClient {
public static void main(String[] args) throws Exception {
//此处应该是会找DNS服务查找域名对应的IP地址
Socket socket = new Socket("localhost", 9000);
//接下来需要跟某个IP地址上的9000端口的服务器程序进行TCP三次握手,然后建立连接
//这时就会构造一个三次握手中的第一次握手的TCP包,在这个TCP包里放入三次握手需要的数据
//然后把这个TCP包封装在IP包里,IP包里是有对应的目标的IP地址,IP包再封装在以太网包里
//接着通过底层的硬件设备和以太网协议把以太网包发送出去
//经过路由器时,会通过IP地址查找路由表来确定下一个路由器的位置
//查找到下一个路由器的mac地址后将其写入到以太网包头,继续通过下一个子网广播出去
//通过这种方式层层转发,一直到对应的服务器上去
//服务端接收到三次握手的第一次握手的TCP包后
//服务端就会回传第二次握手的TCP包给这个客户端的程序,客户端会再次发送第三次握手的TCP包过去
//这样三次握手成功,TCP连接建立起来了
InputStreamReader in = new InputStreamReader(socket.getInputStream());
OutputStream out = socket.getOutputStream();
//发送数据流,底层拆分为一个一个的TCP包发到服务端去
out.write("你好".getBytes());
char[] buf = new char[1024 * 1024];
int len = in.read(buf);
while (len != -1) {
String response = new String(buf, 0, len);
System.out.println("客户端接收到了响应:" + response);
len = in.read(buf);
}
//释放资源
in.close();
out.close();
//通过TCP四次挥手断开连接
socket.close();
}
}
(5)BIO网络编程存在的问题
缺乏弹性伸缩能力,服务端的线程个数和客户端的并发访问数呈1 : 1的正比关系。当客户端并发访问量增加后,可能会耗尽服务端的线程资源。