nio多线程版本
多线程多路复用
多线程NIO,,就是多个线程,每个线程上都有一个Selector,,,比如说一个系统中一个线程用来接收请求,,剩余的线程用来读写数据,,每个线程独立干自己的事,,,
一个线程的多路复用,,虽然不会卡住,,但是执行单个事件的时间过长,也会长时间卡在那里,,,需要开启多个线程,,但是多个线程中执行代码的顺序是不可控的,,一般是在主线程接收到一个新的连接之后,再用子线程中的Selector去关注返回的SocketChannel
,
selector.select()
是在子线程中执行的,,,关注事件是在主线程执行的,,如果子线程中的selector.select()
先阻塞住了,,关注事件的代码就必须等到有新的事件到来,才会往下执行
如果子线程想要监听到这个事件,,注册事件的代码就必须在 selector.select()阻塞的代码的前面,,,但是这个是不可控的,,就需要使用selector.wakeup()
唤醒这个selector.select() 阻塞,,唤醒了之后,后面关注事件
的代码就执行了,后面监听到这个关注事件
,就会去处理
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 可用的核心数 ===》 如果在docker上,,拿到的是物理cpu个数,,而不是容器申请时的个数,,,在jdk10才修复
int i1 = Runtime.getRuntime().availableProcessors();
Worker[] workers = new Worker[2];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-"+i);
}
AtomicInteger index =new AtomicInteger(0);
while (true){
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
System.out.println("客户端已进入"+sc);
sc.configureBlocking(false);
System.out.println("before register...");
// 轮询选择器,选择一个线程执行
workers[index.getAndIncrement() % workers.length].register(sc);
// worker01.register(sc);
// 前面初始化 selector.select()阻塞了
// SelectionKey scKey = sc.register(worker01.selector, 0, null);
// scKey.interestOps(SelectionKey.OP_READ);
System.out.println("after register...");
// sc.register()
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
private volatile boolean start = false;// 是否初始化
public Worker(String name) {
this.name = name;
}
// 初始化线程和Selector
public void register(SocketChannel sc) throws IOException {
if (!start){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
start = true;
}
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
selector.wakeup();
// sc.register(selector,SelectionKey.OP_READ);
// // 唤醒阻塞,,, 阻塞住了,,下面的register就无法到达
// selector.wakeup();
}
@Override
public void run() {
while (true){
try {
selector.select();
// 最开始的时候 ,,,run如果先执行,,,queue里面是空的,,注册逻辑放在select()前面也是一样不会将 socketChannel注册进去,, 需要唤醒
Runnable task = queue.poll();
if (task != null){
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
System.out.println(Thread.currentThread().getName()+"name");
debugAll(buffer);
}else if(key.isWritable()){
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
名词
-
阻塞IO: 用户线程会一直等待,直到数据准备好,才返回,,,就是线程停止,等待数据
-
非阻塞IO:
用户程序空间
到操作系统的内核空间
==》 操作系统会立即返回,,用户线程始终在运行,并没有停下来 -
多路复用: 通过Selector监测事件,,检测到有事件就会到操作系统内核空间去执行
-
同步: 线程自己去获取结果
-
异步: 线程自己不去获取结果,,由另一个线程送结果 (至少有两个线程)
-
异步IO: 异步都是非阻塞的,,一个线程执行,,通过另一个线程返回结果,,,
阻塞IO
是同步的,,也叫同步阻塞,,,
非阻塞IO
也是同步的,也叫同步非阻塞
多路复用
也是同步的,,只是将所有的事件都集中到一起处理
零拷贝
一般的文件读取,都是要经过内核空间
到 用户空间
,再从用户空间
拷贝到 缓冲区
, 一个文件要拷贝很多次,,才能发送出去,,
而零拷贝
: 就是让文件拷贝,不再经过用户空间
,,直接就用操作系统内核空间里面的数据。。
零拷贝适合那种小文件拷贝,,因为大文件会占用很多内核缓冲区,,可能会影响别的IO操作
// 堆外内存(direct buffer) : 直接分配在堆外的内存,减少从堆内存到直接内存的拷贝
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
AIO(Asynchronous IO) 异步输入输出
非阻塞IO,,需要一个回调函数,去传递 执行完成之后返回的结果
public class Demo01 {
/**
* linux 对异步IO不友好,,, 底层只是用多路复用模拟了一个异步IO,,,性能上没有优势
* window系统通过IOCP实现了真正的异步IO
*
*
*/
public static void main(String[] args) throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 异步IO必须是多个线程,,,一个线程发起,一个线程送结果
Path path = Paths.get("/Users/chenjie/code/learn/learn-netty/learn-netty/netty01/src/main/resources/word.txt");
System.out.println(Files.exists(path));
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(8);
ByteBuffer attachBuffer = ByteBuffer.allocate(8);
System.out.println("read begin:"+Thread.currentThread().getName());
/**
* params: ByteBuffer
* params2: 读取的起始位置
* params3: 附件 : 万一一次读不完,,需要一个bytebuffer接着读
* params4: 真正结果出来了,调用这个回调方法
*/
channel.read(buffer,0,attachBuffer,new CompletionHandler<Integer, ByteBuffer>(){
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 当你的文件正确读取完毕后
attachment.flip();
debugAll(attachment);
System.out.println(Thread.currentThread().getName()+"hehe");
countDownLatch.countDown();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("exc = " + exc);
System.out.println(exc.getMessage()+"e");
}
});
System.out.println("read finished");
countDownLatch.await();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}