当前位置: 首页 > article >正文

nio使用

NIO : new Input/Output,,在java1.4中引入的一套新的IO操作API,,,旨在替代传统的IO(即BIO:Blocking IO),,,nio提供了更高效的 文件和网络IO的 操作,,

NIO中分为阻塞模式(Blocking)和非阻塞模式(Non-blocking),,通过configureBlocking(boolean)方法设置,

  • 阻塞模式:

    • I/O 操作阻塞线程: read() :如果没有数据可读,调用会一直阻塞等待数据
      write(): 如果网络缓冲区已经满了,会一直阻塞,直到缓冲区有位置 ,
      accept(): 会一直等待客户端连接
      这些操作都需要一个线程去维持,如果是高并发项目,线程池会打满,,
      与传统的BIO(blocking IO)类似,只是 底层实现更高效
  • 非阻塞模式 (多路复用)
    一般会和Selector一起使用, Selector是NIO中的一个关键组件,,可以监听多个通道触发的事件

事件的类型:

  • accept : 客户端发起连接请求时触发
  • connect : 连接建立触发的事件
  • read : 可读事件,读数据的时候触发,,或者在 客户端主动断开连接,或者客户端异常断开连接触发
  • write : 写入事件,在需要写出数据并且缓冲区有写入位置的时候触发

Selector去建立和channel的关联,,并且监听你想关注的事件,,,当事件被触发之后,selector.select() 就会往下运行,如果没有事件发生,就会阻塞在那里,,
如果有事件发生,可以通过 selector.selectedKeys() 获取到所有的事件SelectionKey,遍历并处理这些事件,,

这个selector.selectedKeys()获取到的事件,,并不会主动移除,,需要在处理完这个事件之后,手动移除,,否则在下一次遍历事件的时候,还会再遍历一次

遇到的问题:

  • 客户端向服务端发送了大量的数据,,read()事件,去读数据的ByteBuffer大小是有限制的,,就可能会产生黏包(多个数据黏到一起,需要拆解数据)和半包(一个数据只发了一部分,需要根据另一部分组装数据),,,如果一个数据很大,设置的ByteBuffer读不完,就需要ByteBuffer扩容,,
    每一个channel都需要一个自己的buffer,,这样数据才不会乱,,就可以将buffer设置在附件中:
    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println("有客户端连接了"+sc);

                    sc.configureBlocking(false);
                    // 第三个参数就是 附件,,, 一个selectionKey 对应一个 附件,,,将buffer写入附件
                    ByteBuffer buffer = ByteBuffer.allocate(4); // attachment 附件

                    SelectionKey selectionKey = sc.register(selector, 0, buffer);
                    selectionKey.interestOps(SelectionKey.OP_READ);

当这个buffer不够用,需要扩容,扩容完了之后使用attch()放入新的附件,,attchment()获取附件

public class Server {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        ServerSocketChannel ssc = ServerSocketChannel.open();

        ssc.configureBlocking(false);

        SelectionKey sscKey = ssc.register(selector, 0, null);

        sscKey.interestOps(SelectionKey.OP_ACCEPT);


        ssc.bind(new InetSocketAddress(8080));
        System.out.println("server start ");

        while (true){
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();

                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    System.out.println("有客户端连接了"+sc);

                    sc.configureBlocking(false);
                    // 第三个参数就是 附件,,, 一个selectionKey 对应一个 附件,,,将buffer写入附件
                    ByteBuffer buffer = ByteBuffer.allocate(4); // attachment 附件

                    SelectionKey selectionKey = sc.register(selector, 0, buffer);
                    selectionKey.interestOps(SelectionKey.OP_READ);


                }else if (key.isReadable()){
                    try {
                        //
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 拿到附件
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        // -1 表示客户端断开
                        int read = channel.read(buffer);
                        if (read == -1){
                            key.cancel();
                        }else{
                            boolean isExtend = split(buffer);

                            if (isExtend){
                                // 需要扩容
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);

                                buffer.flip();
                                // 将旧的buffer中数据,,同步到新的buffer中
                                newBuffer.put(buffer);

                                // 替换新的附件
                                key.attach(newBuffer);
                            }

//                            // 这个buffer读到最后,还是没有提取出来,,
//                            if (buffer.position() == buffer.limit()){
//                                // 需要扩容
//                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
//                                buffer.flip();
//                                // 将旧的buffer中数据,,同步到新的buffer中
//                                newBuffer.put(buffer);
//
//                                // 替换新的附件
//                                key.attach(newBuffer);
//                            }
                        }

//                        String s = Charset.defaultCharset().decode(buffer).toString();
//                        System.out.println("s = " + s);
                    } catch (IOException e) {
                        key.cancel();
                        throw new RuntimeException(e);
                    }
                }

            }
        }


    }



    private static boolean split(ByteBuffer source){
//        debugAll(source);
        boolean flag = false;
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            if (source.get(i)== '\n'){
                int pointPosition = source.position();
                int len = i+1 - pointPosition;
                ByteBuffer buffer = ByteBuffer.allocate(len);
                for (int j = 0; j < len; j++) {
                    byte b = source.get();
                    buffer.put(b);
                }
                flag = true;
                System.out.println("buffer1 = " + Charset.defaultCharset().decode(buffer));
                debugAll(buffer);
                buffer.flip();
                System.out.println("buffer2 = " + Charset.defaultCharset().decode(buffer));
//                System.out.println(Charset.defaultCharset().decode(buffer));
            }
        }

        source.compact();
        return !flag;
    }
public class Client {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();

//        sc.configureBlocking(false);

        sc.connect(new InetSocketAddress("localhost", 8080));


        SocketAddress localAddress = sc.getLocalAddress();
        sc.write(Charset.defaultCharset().encode("hello\n123131server\n"));

        new Scanner(System.in).next();

        System.in.read();

    }
}
  • 客户端正常关闭会触发read事件,导致服务端无限循环去处理这个read事件
    判断 如果read返回-1,表示没读到数据,,客户端已经关闭,,使用 cancel() 取消事件
if(key.isReadable()){
                    ByteBuffer buffer = ByteBuffer.allocate(2);

                    // 关闭客户端会触发读事件  ,,这个read会进入selectkey
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();

                        // 返回读到的字节数,,,如果返回-1 : 表示正常断开
                        int read = channel.read(buffer);
                        if (read == -1){
                            key.cancel();
                        }else {
                            buffer.flip();
//                            debugAll(buffer);
                            System.out.println(Charset.defaultCharset().decode(buffer));
                        }
                        
                    } catch (IOException e) {
                        // 异常断开
                        key.cancel();
                        throw new RuntimeException(e);
                    }
                }
  • 如果服务器发送很大的数据,,网络缓冲区一次性读不下,,就需要注册一个write事件进去,让Selector监测一旦网络缓冲区有位置了就去执行write事件
public class WriteServer {


    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);


        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);


        ssc.bind(new InetSocketAddress(8080));
        System.out.println("server start");

        while(true){
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if(key.isAcceptable()){

//                    key.channel()
                    // serverSocketChannel只有一个,,就是创建的那个
                    SocketChannel sc = ssc.accept();
                    System.out.println("有客户端连接了"+sc);
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);


                    // 向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }

                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    // 这个write并不能保证,一次性吧数据都写给客户端 ==> 返回值表示一次写了多少字节
//                    while(buffer.hasRemaining()){
//                        // 网络的缓冲区是有限制的,,写不进去了,返回就是0 ===》 这样不符合非阻塞的思想,,,只要内容没发完,就一直在循环这里卡着,,虽然能将大量的数据发送给客户端,但是效率不搞
//                        // 发送缓冲区是有限制的  ==》 不要一直卡在这里
//                        int write = sc.write(buffer);
//                        System.out.println("write = " + write);
//                    }


                    if (buffer.hasRemaining()) {
                        // 是否有剩余内容
                        // 注册写事件   ===> 必须把之前的interest加上去,,不然会把之前的事件覆盖掉
                        scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
//                        scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
                        // 把buffer关联到selectionKey
                        scKey.attach(buffer);

                    }

                }else if(key.isWritable()){

                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();

                    int write = sc.write(buffer);
                    System.out.println("write = " + write);
//                    if (write < buffer.)

                    // 写完了,,清除buffer,不用再关注可写事件
                    if (!buffer.hasRemaining()){
                        key.attach(null);
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }


                }
            }
        }

    }
}

public class WriteClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

        int count = 0;
        // 接收数据
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println("count = " + count);

            // 重置指针
            buffer.clear();
        }
    }
}

http://www.kler.cn/a/569551.html

相关文章:

  • 在 ASP.NET Core 中压缩并减少图像的文件大小
  • SQL命令详解之数据的查询操作
  • SpringBoot Maven快速上手
  • 量子关联特性的多维度探索:五量子比特星型系统与两量子比特系统的对比分析
  • Nodejs-逐行读取文件【简易版】
  • 【第十节】C++设计模式(结构型模式)-Flyweight( 享元)模式
  • Python爬虫:WebAssembly案例分析与爬取实战
  • AWS API Gateway灰度验证实现
  • Difyにおけるデータベースマイグレーション手順
  • 【爬虫基础】第二部分 爬虫基础理论 P2/3
  • 【开源-线程池(Thread Pool)项目对比】
  • 01.01 QT信号和槽
  • FastExcel vs EasyExcel vs Apache POI:三者的全面对比分析
  • Kali Linux 2024.4版本全局代理(wide Proxy)配置,适用于浏览器、命令行
  • 初阶数据结构(C语言实现)——3顺序表和链表(2)
  • React+Antd-Mobile遇到的问题记录
  • 主题爬虫(Focused Crawler)
  • 内网渗透测试-Vulnerable Docker靶场
  • 【开源免费】基于SpringBoot+Vue.JS医院药品管理系统(JAVA毕业设计)
  • 如何在Spring Boot项目中集成JWT实现安全认证?