Scalable Io-NIO实践
Scalable Io-NIO实践
上一章我们具体讲解了NIO的实现原理,这一章我们来自己手写一个Reactor
您可以仔细看下代码,都写了具体的注释
1.Reactor
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class Reactor {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
Reactor() throws IOException {
//打开选择器
selector = Selector.open();
//打开服务器通道
serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
//开始监听本地8080端口号信息
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8080));
//设置非阻塞模式
serverSocketChannel.configureBlocking(false);
//设置模式为接收
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//绑定一个通道的具体处理事件
key.attach(new Acceptor());
}
public void run(){
try{
while(!Thread.interrupted()){
//阻塞,获取有消息的通道
selector.select();
//获取通道集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while(iterator.hasNext()){
//开始给具体通道分配任务
dispatch(iterator.next());
}
}
}catch (Exception e){}
}
public void dispatch(SelectionKey key){
Runnable r = (Runnable)key.attachment();
if(r != null){
r.run();
}
}
class Acceptor implements Runnable{
@Override
public void run() {
try {
SocketChannel c=serverSocketChannel.accept();
if(c != null){
new Handler(selector,c);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
//工具类,来处理通道事件
final class Handler implements Runnable{
final SocketChannel socket;
final SelectionKey sk;
//申请缓冲区
ByteBuffer input=ByteBuffer.allocate(1024);
ByteBuffer output=ByteBuffer.allocate(1024);
//定义处理状态
static final int READING=0,SENDING=1;
int state=READING;
Charset charset = Charset.forName("UTF-8");
Handler(Selector sel,SocketChannel c) throws IOException {
socket=(SocketChannel) c.configureBlocking(false);
sk=socket.register(sel,0);
sk.attach(this);//SelectionKeys绑定Handler,官方叫attach
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
@Override
public void run() {
try{
if(state==READING){
read();
}else if(state==SENDING) {
send();
}
}catch (IOException e){
System.out.println(e.getMessage());
}
}
//读取通道内的数据并打印出来,再将状态改为发送态
void read() throws IOException {
int read = socket.read(input);
byte[] array = input.array();
String s=new String(array,0,read,charset);
System.out.println(s);
input.clear();
state=SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
//向通道发送消息
void send() throws IOException {
output.put("服务端发送消息".getBytes());
output.flip();//必须
socket.write(output);
output.clear();
sk.cancel();
}
}
}
2.测试类
public class TestReactor {
public static void main(String[] args) throws IOException {
Reactor reactor = new Reactor();
reactor.run();
}
}
3.客户端
class NIO2Client{
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("客户端已启动");
//获取Selector
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
// 发送消息到服务器
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("my name is Lordyi".getBytes());
buffer.flip(); // 切换到写模式
socketChannel.write(buffer);
// 清空buffer,准备接收服务器响应
buffer.clear();
Charset charset = Charset.forName("UTF-8");
while(true){
//阻塞拿到具体的通道
selector.select();
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
while(selectionKeys.hasNext()){
SelectionKey selectionKey = selectionKeys.next();
//当通道内有数据并处于可读状态时
if(selectionKey.isReadable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
buffer.clear();
//将数据放入缓冲区
int read = channel.read(buffer);
if(read==-1) return;
buffer.flip();
//进行Decode解码将二进制转换为字符串
byte[] array = buffer.array();
String s = new String(array, 0, read, charset);
System.out.println(s);
}
//移除此通道
selectionKeys.remove();
}
}
}
}
4.演示结果
参考文献———Scalable Io in JAVA-Doug Lea
State University of New York at Oswego