当前位置: 首页 > article >正文

Scalable Io-NIO实践

Scalable Io-NIO实践

上一章我们具体讲解了NIO的实现原理,这一章我们来自己手写一个Reactor
您可以仔细看下代码,都写了具体的注释

1.Reactor

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class Reactor {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    Reactor() throws IOException {
        //打开选择器
        selector = Selector.open();
        //打开服务器通道
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        //开始监听本地8080端口号信息
        serverSocket.bind(new InetSocketAddress("127.0.0.1", 8080));
        //设置非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //设置模式为接收
        SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //绑定一个通道的具体处理事件
        key.attach(new Acceptor());
    }
    public void run(){
        try{
            while(!Thread.interrupted()){
                //阻塞,获取有消息的通道
                selector.select();
                //获取通道集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                while(iterator.hasNext()){
                    //开始给具体通道分配任务
                    dispatch(iterator.next());
                }
            }
        }catch (Exception e){}
    }

    public void dispatch(SelectionKey key){
        Runnable r = (Runnable)key.attachment();
        if(r != null){
            r.run();
        }
    }

    class Acceptor implements Runnable{
        @Override
        public void run() {
            try {
                SocketChannel c=serverSocketChannel.accept();
                if(c != null){
                    new Handler(selector,c);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
//工具类,来处理通道事件
    final class Handler implements Runnable{
        final SocketChannel socket;
        final SelectionKey sk;
        //申请缓冲区
        ByteBuffer input=ByteBuffer.allocate(1024);
        ByteBuffer output=ByteBuffer.allocate(1024);
        //定义处理状态
        static final int READING=0,SENDING=1;
        int state=READING;
        Charset charset = Charset.forName("UTF-8");
        Handler(Selector sel,SocketChannel c) throws IOException {
            socket=(SocketChannel) c.configureBlocking(false);
            sk=socket.register(sel,0);
            sk.attach(this);//SelectionKeys绑定Handler,官方叫attach
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }
        @Override
        public void run() {
            try{
                if(state==READING){
                    read();
                }else if(state==SENDING) {
                    send();
                }
            }catch (IOException e){
                System.out.println(e.getMessage());
            }
        }
		//读取通道内的数据并打印出来,再将状态改为发送态
        void read() throws IOException {
            int read = socket.read(input);
            byte[] array = input.array();
            String s=new String(array,0,read,charset);
            System.out.println(s);
            input.clear();
            state=SENDING;
            sk.interestOps(SelectionKey.OP_WRITE);
        }
		//向通道发送消息
        void send() throws IOException {
            output.put("服务端发送消息".getBytes());
            output.flip();//必须
            socket.write(output);
            output.clear();
            sk.cancel();
        }
    }
}

2.测试类

public class TestReactor {
    public static void main(String[] args) throws IOException {
        Reactor reactor = new Reactor();
        reactor.run();
    }
}

3.客户端

class NIO2Client{
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("客户端已启动");
        //获取Selector
        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector,SelectionKey.OP_READ);

        // 发送消息到服务器
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("my name is Lordyi".getBytes());
        buffer.flip();  // 切换到写模式
        socketChannel.write(buffer);

        // 清空buffer,准备接收服务器响应
        buffer.clear();
        Charset charset = Charset.forName("UTF-8");

        while(true){
            //阻塞拿到具体的通道
            selector.select();
            Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
            while(selectionKeys.hasNext()){
                SelectionKey selectionKey = selectionKeys.next();
                //当通道内有数据并处于可读状态时
                if(selectionKey.isReadable()){
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    buffer.clear();
                    //将数据放入缓冲区
                    int read = channel.read(buffer);
                    if(read==-1) return;
                    buffer.flip();
                    //进行Decode解码将二进制转换为字符串
                    byte[] array = buffer.array();
                    String s = new String(array, 0, read, charset);
                    System.out.println(s);
                }
                //移除此通道
                selectionKeys.remove();
            }
        }

    }
}

4.演示结果

在这里插入图片描述

在这里插入图片描述

参考文献———Scalable Io in JAVA-Doug Lea
State University of New York at Oswego


http://www.kler.cn/a/445281.html

相关文章:

  • 如何使用 WebAssembly 扩展后端应用
  • OpenGL ES 01 渲染一个四边形
  • K8s 节点 NotReady 后 Pod的变化
  • [Unity] 【VR】【游戏开发】在VR中使用New Input System获取按键值的完整教程
  • Vulhub:Redis[漏洞复现]
  • imx6ull qt多页面控制系统(正点原子imx系列驱动开发)
  • 使用 DeepSpeed 微调 OPT 基础语言模型
  • 【新版】阿里云ACP大数据工程师模拟试题(含答案解析)
  • wepack的各个版本差异?
  • 生产环境kafka升级过程
  • RadiAnt DICOM - 基本主题 :从 PACS 服务器打开研究
  • 彻底理解如何优化接口性能
  • 【Python】Selenium模拟滚动鼠标,向下拖动下拉按钮,直至网页页面向下滑的方法
  • vue3+vite 引入动画组件库 Inspira UI
  • Python机器学习算法KNN、MLP、NB、LR助力油气钻井大数据提速参数优选及模型构建研究...
  • flask-admin+Flask-WTF 实现实现增删改查
  • HTMLCSS:酷炫的3D开关控件
  • 设计模式详解(十一):模板方法——Template Method
  • 数字化供应链:背景特点
  • <论文>初代GPT长什么样?
  • es-head安装使用以及常见问题
  • Spring框架(1)——IOC(控制权反转)的实现
  • 深度比较:OpenNI2 SDK与Orbbec SDK的功能、优势和选择指南
  • parquet类型小文件合并
  • ESP32单片机开发
  • uniApp上传文件踩坑日记