当前位置: 首页 > article >正文

Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)

单线程Reactor

package org.example.utils.echo.single;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class EchoServerReactor implements Runnable{
    Selector selector;
    ServerSocketChannel serverSocketChannel;
    EchoServerReactor() throws IOException {
        //Reactor初始化
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress("localhost",
                        8848);

        //非阻塞
        serverSocketChannel.configureBlocking(false);


        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocketChannel.register(selector,0,new AcceptorHandler());

        // SelectionKey.OP_ACCEPT
        serverSocketChannel.socket().bind(address);
        System.out.println("服务端已经开始监听:"+address);


        sk.interestOps(SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
       try {
           while (!Thread.interrupted()){
               selector.select();
               Set<SelectionKey>  selected=selector.selectedKeys();
               Iterator<SelectionKey> it=selected.iterator();
               while (it.hasNext()){
                   SelectionKey sk=it.next();
                   dispatch(sk);
               }
               selected.clear();
            }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
    }

    private void dispatch(SelectionKey sk) {
        Runnable handler=(Runnable) sk.attachment();
        if (handler!=null){
            handler.run();
        }
    }

    class AcceptorHandler implements Runnable{


        @Override
        public void run() {
            try {
                SocketChannel channel=serverSocketChannel.accept();
                if (channel!=null)
                    new EchoHandler(selector,channel);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }

    public static void main(String[] args) throws IOException {
        new Thread(new EchoServerReactor()).start();
    }
}
package org.example.utils.echo.single;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class EchoHandler implements Runnable{
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
    static final int RECIEVING=0,SENDING=1;
    int state=RECIEVING;

    EchoHandler(Selector selector,SocketChannel c) throws IOException {
        channel=c;
        c.configureBlocking(false);
        sk=channel.register(selector,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    @Override
    public void run() {
        try {
            if (state==SENDING){
                channel.write(byteBuffer);
                byteBuffer.clear();
                sk.interestOps(SelectionKey.OP_READ);
                state=RECIEVING;
            }else if (state==RECIEVING){
                int length=0;
                while ((length=channel.read(byteBuffer))>0)
                {
                    System.out.println(new String(byteBuffer.array(),0,length));
                }
                byteBuffer.flip();
                sk.interestOps(SelectionKey.OP_WRITE);
                state=SENDING;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

结果:

原理无非就是:

多线程,无非就是搞多个Reactor   ,   一个专门接受accept  ,  一个专门dispatch ,  再搞一个多线程池处理handle

这里面最主要的就是

handle类,sk.attach(this);把对象传回reactor

参考文献:

java高并发核心编程. 卷1,NIO、Netty、Redis、ZooKeeper  (尼恩)


http://www.kler.cn/news/162355.html

相关文章:

  • 时间复杂度为 O(n^2) 的排序算法 | 京东物流技术团队
  • 智能优化算法应用:基于蜣螂算法无线传感器网络(WSN)覆盖优化 - 附代码
  • 批量免费AI写作工具,批量免费AI写作软件
  • 华为电视盒子 EC6108V9C 刷机成linux系统
  • 【推荐系统】推荐算法数学基础
  • 【C++】:STL源码剖析之vector类容器的底层模拟实现
  • Theamleaf导出pdf模版编写(原始th/td编写表格)
  • 前端:HTML+CSS+JavaScript实现轮播图2
  • 网络运维与网络安全 学习笔记2023.12.1
  • 设计图中时序图
  • 搞懂HashTable, HashMap, ConcurrentHashMap 的区别,看着一篇就足够了!!!
  • API成批分配漏洞介绍与解决方案
  • 游戏策划常用的ChatGPT通用提示词模板
  • vue实现页面之间的el-select同步数据选项
  • 【大数据】HBase 中的列和列族
  • 【数据结构】字典树(Trie树)算法总结
  • pydantic的基础用法
  • STM32-OLED显示屏
  • 2023 金砖国家职业技能大赛网络安全省赛理论题样题(金砖国家未来技能挑战赛)
  • 基于Java酒店管理系统
  • DedeCms后台文章列表文档id吗?或者快速定位id编辑文章
  • 【Node.js】基础梳理 6 - MongoDB
  • 安全快速地删除 MySQL 大表数据并释放空间
  • 微信小程序 - 创建 ZIP 压缩包
  • Termux
  • VIT总结
  • 算法学习—排序
  • 用网安技术去合法挖漏洞,一个月能拿多少钱?想不到吧!
  • NVMe Over Fabrics with iRDMA总结 - 1
  • WT2605-24SS录放音语音芯片:便捷按键功能提升用户体验