Java 输入与输出之 NIO【非阻塞式IO】【NIO网络编程】探索之【二】
上一篇博客我们介绍了NIO的核心原理、FileChannel和Buffer, 对Buffer的用法有了清晰的了解。上篇博客:
Java 输入与输出之 NIO【非阻塞式IO】【NIO核心原理】探索之【一】
本篇博客我们将继续来探索NIO,介绍如何使用SocketChannel和ServerSocketChannel来实现TCP协议的非阻塞套接字(Socket)网络通信程序编程。NIO的强大功能部分来自于Channel的非阻塞特性,套接字的某些操作可能会无限期地阻塞。例如,对accept()方法的调用可能会因为等待一个客户端连接而阻塞;对read()方法的调用可能会因为没有数据可读而阻塞,直到连接的另一端传来新的数据。总的来说,创建/接收连接或读写数据等I/O调用,都可能无限期地阻塞等待。
NIO的Channel抽象的一个重要特征就是可以通过配置它的阻塞行为,以实现非阻塞式的信道。
下面这行代码设置了通道的非阻塞模式:
channel.configureBlocking(false)
在非阻塞式信道上调用一个方法总是会立即返回。这种调用的返回值指示了所请求的操作完成的程度。例如,在一个非阻塞式ServerSocketChannel上调用accept()方法,如果有连接请求来了,则返回客户端SocketChannel,否则返回null。
三、利用NIO编写网络通信程序(Selector的核心应用)
网络通信相关的三大核心组件:
- 通道(channel):负责管道节点的连接及数据的运输
- 缓冲区(buffer):负责数据的存取
- 选择器(selector)及selectionKey:是selectableChannel的多路复用器,用于监控SelectableChannel的IO状况。而选择键(SelectionKey)则是一种将通道和选择器(Selector)进行关联的机制。
SocketChannel: 是通道Channel重要的实现类,用于TCP协议的网络通信,用于实现网络通讯客户端的应用程序;
ServerSocketChannel: 是通道Channel重要的实现类,用于监听TCP连接请求,主要用于实现网络通讯服务器端的应用程序。
阻塞与非阻塞
- 阻塞式
传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。 - 非阻塞式
Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同
时处理连接到服务器端的所有客户端。
利用Java NIO网络通讯应用程序的操作步骤和示意图
Java 在使用NIO进行网络编程时,多线程应用程序的操作步骤和示意图如下所示:
处理步骤:
- 创建通道:
打开一个或多个通道,例如FileChannel、SocketChannel等。
创建缓冲区:为每个通道创建一个或多个缓冲区,用于读取或写入数据。 - 注册通道:
将通道注册到选择器,以便选择器可以监控这些通道的状态。 - 选择就绪通道:
选择器等待通道就绪事件,一旦有通道准备好进行I/O操作,选择器将通知应用程序。 - 读取/写入数据:
应用程序从通道读取数据或将数据写入通道,使用缓冲区来传输数据。
其中,通道和缓冲区是一对一的关系。每个通道都有一个与之对应的缓冲区,用于存储数据。
选择器(Selector)可以同时监视多个通道的状态。一个选择器可以绑定多个通道,以实现多路复用。
selectionKey有以下四个方法,可用来测试四种状态,它们都会返回一个布尔类型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
通过Selector选择通道
一旦向Selector注册了一个或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。
下面是select()方法:
int select()
int select(long timeout)
int selectNow()
select()阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。
select()方法返回的int值表示有多少通道已经就绪。
网络通信应用例程
我们先来看一个网络通信应用的例程,客户端采用NIO实现,而服务端依旧使用IO实现。
- NIO非阻塞实现的客户端
采用NIO的客户端程序源代码:
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
public class SocketClientNIO {
public static void client(){
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
try
{
SocketAddress server = new InetSocketAddress("127.0.0.1",8080);
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false); //非阻塞式
socketChannel.connect(server);
//与服务端连接成功
if(socketChannel.finishConnect())
{
int i=0;
while(true)
{
TimeUnit.SECONDS.sleep(1);
String info = "客户端发送信息,第 "+ ++i +"条";
buffer.clear();
buffer.put(info.getBytes("GBK"));
buffer.flip(); //切换
while(buffer.hasRemaining()){
System.out.println(buffer);
socketChannel.write(buffer);
}
}
}
}
catch (IOException | InterruptedException e)
{
e.printStackTrace();
}
finally{
try{
if(socketChannel!=null){
socketChannel.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SocketClientNIO.client();
}
}
网络通讯程序的服务器端的实现详解:
网络通讯服务器程序的核心逻辑:
服务器首先创建了一个非阻塞的ServerSocketChannel,并将其注册到Selector上去监听ACCEPT事件(即连接事件)。
在一个无限循环中,它会调用selector.select()来等待注册的事件发生,并处理这些事件。对于每个ACCEPT事件,它会接受新的连接,并将其设置为非阻塞模式。
在实际应用中,可以在接受连接后注册更多的事件(如读、写事件),并在事件处理代码中实现网络通信的业务处理逻辑。
选择器selector使用步骤
创建选择器selector
通过调用Selector.open()方法创建一个Selector。
向选择器selector注册通道
注册之前,先设置通道为非阻塞的,channel.configureBlocking(false);然后再调用SelectableChannel.register(Selector sel,int ops)方法将channel注册到Selector中;其中ops的参数作用是设置选择器对通道的监听事件,ops参数的事件类型有四种(可以通过SelectionKey的四个常量表示):
(1)读取操作:SelectionKey.OP_READ,数值为1.
(2)写入操作:SelectionKey.OP_WRITE,数值为4.
(3)socket连接操作:SelectionKey.OP_CONNECT,数值为8.
(4)socket接受操作:SelectionKey .OP_ACCEPT,数值为16.
若注册时不止监听一个事件,可以使用“| 位或”操作符连接。
选择键SelectionKey
表示SelectableChannel在Selector中的注册的标志,每次向选择器注册通道的时候就会选择一个事件(以上四种事件类型)即选择键,选择键包含两个表示位整数值的操作集(分别为interst集合和ready集合),操作集的每一位都表示该键的通道所支持的一类可选择操作。
下面我们提供三个服务器应用程序的版本。
- 服务器版本一,采用阻塞式IO实现的网络通讯服务器端程序(特点是简单)的源代码:
package nio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
public class SocketServerIO {
public static void server(){
ServerSocket serverSocket = null;
InputStream in = null;
try
{
serverSocket = new ServerSocket(8080);
System.out.println("服务器开启,等待接收客户端连接");
int recvMsgSize = 0;
byte[] recvBuf = new byte[1024];
while(true){
Socket client = serverSocket.accept();
SocketAddress clientAddr = client.getRemoteSocketAddress();
System.out.println("接收到客户端,连接IP: "+clientAddr);
in = client.getInputStream();
while((recvMsgSize=in.read(recvBuf))!=-1){
byte[] temp = new byte[recvMsgSize];
System.arraycopy(recvBuf, 0, temp, 0, recvMsgSize);
System.out.println("接收报文:"+new String(temp,"GBK"));
}
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally{
try{
if(serverSocket!=null){
serverSocket.close();
}
if(in!=null){
in.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SocketServerIO.server();
}
}
现在我们来调试一下这一对服务端和客户端网络通讯程序。
首先,编译后执行服务端应用程序,开启服务器服务,以接受客户端的请求;
然后,编译执行客户端的应用程序。下面是服务端的测试效果:
- 服务器版本二,利用NIO实现的非阻塞模式的网络通讯服务器端
本例程的Buffer采用非直接缓冲区:
// 创建一个缓冲区
private static ByteBuffer buffer = ByteBuffer.allocate(1024);
服务器源代码如下:
package nio;
/***
* @author QiuGen
* @description NIO网络通讯服务器
* *** 本例程的Buffer采用非直接缓冲区
* @date 2024/8/28
* ***/
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class SocketServer {
private static final int TIMEOUT = 3000;
// 创建一个缓冲区
private static ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void handleRead(SelectionKey key) throws IOException{
// 获取当前key的通道
SocketChannel socketChannel = (SocketChannel) key.channel();
// 从通道中读取数据到缓冲区
int len = socketChannel.read(buffer);
String inf = null;
byte[] tmpBuf = new byte[1024];
while ( len > 0 ) {
buffer.flip(); //切换
while(buffer.hasRemaining()){
buffer.get(tmpBuf, 0, len);
inf = new String(tmpBuf, "GBK");
System.out.print("接收报文: "+inf);
}
System.out.println();
buffer.clear(); //初始化缓冲区
len = socketChannel.read(buffer);
}
}
public static void handleWrite(SelectionKey key) throws IOException{
//ByteBuffer buf = (ByteBuffer)key.attachment();
buffer.flip(); //切换
SocketChannel sc = (SocketChannel) key.channel();
while(buffer.hasRemaining()){
sc.write(buffer);
}
buffer.compact();
}
public static void main(String[] args) throws IOException {
// 创建一个服务器套接字通道
ServerSocketChannel socketChannel = ServerSocketChannel.open();
// 将服务器套接字通道绑定到指定端口
socketChannel.bind(new InetSocketAddress(8080));
// 将服务器套接字通道设置为非阻塞模式
socketChannel.configureBlocking(false);
// 创建一个选择器
Selector selector = Selector.open();
// 将服务器套接字通道注册到选择器上,监听连接事件
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("SocketServerNIO非直接缓冲区,服务端启动***");
while (true) {
//等待选择器Selector上注册的事件
if(selector.select(TIMEOUT) == 0){
System.out.println("服务器空闲,等待客户端连接***");
continue;
}
// 获取所有发生的SelectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历所有SelectionKey
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
// 获取当前SelectionKey
SelectionKey key = iterator.next();
// 判断当前键的通道是否准备好接收socket连接
if (key.isAcceptable()) { // 处理客户端连接事件
// 接受客户端连接
SocketChannel sc = socketChannel.accept();
// 将客户端连接通道设置为非阻塞模式
sc.configureBlocking(false);
// 将客户端连接通道注册到选择器上,监听读事件
sc.register(selector, SelectionKey.OP_READ);
// 判断当前key的通道是否准备好读取操作
}
if (key.isReadable()) { //处理读事件
handleRead(key);
}
if(key.isWritable() && key.isValid()){ //处理写事件
handleWrite(key);
}
// 移除当前事件
iterator.remove();
}
}
}
}
与客户端连调,测试结果如下:
- 服务器版本三,利用NIO实现的非阻塞模式的网络通讯服务器端应用程序源码:
本例程的Buffer采用直接缓冲区:
ByteBuffer.allocateDirect(BUF_SIZE);
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class SocketServerNIO {
private static final int BUF_SIZE=1024;
private static final int PORT = 8080;
private static final int TIMEOUT = 3000;
public static void handleAccept(SelectionKey key) throws IOException{
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(BUF_SIZE));
}
public static void handleRead(SelectionKey key) throws IOException{
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = (ByteBuffer)key.attachment();
int bytesRead = sc.read(buf);
String inf = null;
byte[] tmpBuf = new byte[1024];
while(bytesRead>0){
buf.flip();
while(buf.hasRemaining()){
//System.out.print((char)buf.get());
buf.get(tmpBuf, 0, bytesRead);
inf = new String(tmpBuf, "GBK");
System.out.print("接收报文: "+inf);
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException{
ByteBuffer buf = (ByteBuffer)key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();
}
public static void server() {
Selector selector = null;
ServerSocketChannel ssc = null;
try{
selector = Selector.open();
ssc= ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("SocketServerNIO,服务端启动***");
while(true){
if(selector.select(TIMEOUT) == 0){
System.out.println("服务端空闲,等待客户端连接==");
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
handleAccept(key);
}
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
if(key.isConnectable()){
System.out.println("isConnectable = true");
}
iter.remove();
}
}
}catch(IOException e){
e.printStackTrace();
}finally{
try{
if(selector!=null){
selector.close();
}
if(ssc!=null){
ssc.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SocketServerNIO.server();
}
}
以同样方法进行网络通讯测试:首先编译后执行服务器端应用程序;然后编译执行上面同一个客户端应用程序发送报文。服务端测试结果如下图:
说明:
对于大负荷的服务器应用程序实现时,建议Buffer采用直接缓冲区方案,应当会有更优异的性能。
参考文献&博客:
- 参考文献之一
攻破JAVA NIO技术壁垒 - 参考文献之二
Java NIO详解[通俗易懂] - 参考文献之三
Java NIO全面详解(看这篇就够了)