当前位置: 首页 > article >正文

NIO | 什么是Java中的NIO —— 结合业务场景理解 NIO (二)

实时通信应用 的主流技术 并非NIO , 整理本文的目的是 更好的理解 NIO

在现代的 即时聊天应用中,使用 WebSocket、MQTT 或 SignalR 等协议更为普遍。

若您想了解 当前主流的有关 实时通信应用 的技术 , 请移步他文。

(一) 业务场景

实时通信应用(如即时聊天系统)通常要求能够 同时管理 多个客户端连接,并在客户端之间实时交换消息。每个消息的发送和接收都需要立即响应,因此要求系统能够快速非阻塞地处理大量的网络连接。

(二) NIO 解决方案

利用 NIO 的 SelectorSocketChannel,一个单独的线程 就可以管理 多个客户端连接,并在收到消息时立即响应。通过非阻塞 I/O 和选择器,系统能够高效地管理客户端连接,并且不会因等待某个客户端的响应而阻塞其他客户端。

示例: 构建一个简单的基于 NIO 的聊天服务器,能够同时处理多个客户端的消息:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.net.*;

public class ChatServer {

    public static void main(String[] args) throws IOException {
        // 启动聊天服务器
        startServer();
    }

    // 启动服务器的具体实现,提取为一个独立的方法
    public static void startServer() throws IOException {
        // 创建一个 ServerSocketChannel 用于监听客户端的连接请求
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定到指定端口 9090
        serverSocketChannel.bind(new InetSocketAddress(9090));
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);

        // 创建一个 Selector 用于多路复用 I/O 事件
        Selector selector = Selector.open();
        // 将 serverSocketChannel 注册到 selector 上,监听客户端的连接请求
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 进入事件处理循环
        while (true) {
            // 阻塞,直到有 I/O 事件发生
            selector.select();

            // 遍历所有已选择的事件
            for (SelectionKey key : selector.selectedKeys()) {
                // 如果是客户端连接请求事件
                if (key.isAcceptable()) {
                    // 接受客户端连接
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);  // 设置为非阻塞模式
                    // 将客户端通道注册到 selector 上,监听客户端的读操作
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
                // 如果是可读事件(客户端发送了消息)
                else if (key.isReadable()) {
                    // 获取客户端的通道
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    // 创建缓冲区来接收数据
                    ByteBuffer buffer = ByteBuffer.allocate(256);
                    // 读取客户端发送的数据
                    int bytesRead = clientChannel.read(buffer);
                    if (bytesRead == -1) {
                        // 如果客户端关闭连接,关闭其通道
                        clientChannel.close();
                    } else {
                        buffer.flip();  // 准备缓冲区读取
                        // 将接收到的消息广播到其他客户端
                        broadcastMessage(selector, buffer);
                    }
                }
            }
            // 清空已处理的事件
            selector.selectedKeys().clear();
        }
    }

    // 广播消息给所有客户端
    private static void broadcastMessage(Selector selector, ByteBuffer buffer) {
        // 遍历 selector 上的所有通道
        for (SelectionKey key : selector.keys()) {
            // 仅处理有效且可写的通道
            if (key.isValid() && key.isWritable()) {
                // 获取通道
                SocketChannel channel = (SocketChannel) key.channel();
                try {
                    // 重置缓冲区的位置和限制,确保每个通道写入正确的数据
                    buffer.flip();
                    channel.write(buffer);
                    buffer.clear();  // 清空缓冲区,准备下次读取
                } catch (IOException e) {
                    e.printStackTrace();  // 异常处理
                }
            }
        }
    }
}

(三) 测试类代码

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;

import static java.lang.Thread.sleep;

public class ChatServerTest {

    private static final int PORT = 9090;

    public static void main(String[] args) {
        // 启动服务器线程
        Thread serverThread = new Thread(() -> {
            try {
                // 启动聊天服务器
                ChatServer.startServer();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        // 启动服务器
        serverThread.start();

        try {
            // 等待服务器启动
            Thread.sleep(100); // 这里可以根据实际情况调整等待时间
            System.out.println("Server started successfully!");

            // 创建并连接客户端1
            SocketChannel client1 = SocketChannel.open();
            client1.connect(new InetSocketAddress("localhost", PORT));
            System.out.println("Client1 connected");

            // 创建并连接客户端2
            SocketChannel client2 = SocketChannel.open();
            client2.connect(new InetSocketAddress("localhost", PORT));
            System.out.println("Client2 connected");

            // 向服务器发送消息
            String message = "Hello from Client1!";
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
            client1.write(buffer);
            System.out.println("Client1 sent message: " + message);

            // 读取客户端2的消息
            buffer.clear();  // 清空缓冲区以准备接收新数据
            int bytesRead = client2.read(buffer);  // 从客户端2读取数据到缓冲区
            if (bytesRead == -1) {
                client2.close();  // 如果客户端关闭连接,关闭其通道
            } else if (bytesRead > 0) {
                buffer.flip();  // 切换为读取模式
                String receivedMessage = new String(buffer.array(), 0, buffer.limit());
                System.out.println("Client2 received message: " + receivedMessage);
            }

            // 关闭客户端连接
            client1.close();
            client2.close();
            System.out.println("Clients disconnected");

            // 让服务器继续运行一段时间
            Thread.sleep(2000);
            System.out.println("Test completed successfully!");

        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

        // 结束测试后停止服务器线程
        serverThread.interrupt();
    }
}

1. 简要说明

  • ChatServerTest 类用于模拟客户端与服务器的交互。它启动一个聊天服务器线程并创建两个模拟客户端进行通信。
  • ChatClientSimulator 类模拟客户端行为:连接到服务器,发送消息,并接收来自服务器的消息。

待解决:

目前在测试方法中 执行到

int bytesRead = client2.read(buffer);  // 从客户端2读取数据到缓冲区

就不在执行了


http://www.kler.cn/a/518409.html

相关文章:

  • 系统思考—问题分析
  • 科技快讯 | 理想官宣:正式收费!WeChat 港币钱包拓宽商户网络;百川智能发布深度思考模型Baichuan-M1-preview
  • websocket实现
  • leetcode28-找出字符串中第一个匹配的下标
  • 【python】四帧差法实现运动目标检测
  • 【安当产品应用案例100集】034-安当KSP支持密评中存储数据的机密性和完整性
  • FPGA实现光纤通信(3)——光纤8b/10b编码数据回环
  • [C++技能提升]类注册
  • 大数据k-means聚类算法:基于k-means聚类算法+NLP微博舆情数据爬虫可视化分析推荐系统
  • FireCrawl开源 AI 网络爬虫工具,自动爬取网站及子页面内容,预处理为结构化数据
  • JVM面试题解,垃圾回收之“分代回收理论”剖析
  • Day109 MySQL深入及优化
  • 前端三件套之CSS
  • 基于机器学习链家网房屋数据分析预测系统的设计与实现
  • 基于 Node.js 的天气查询系统实现(附源码)
  • SSM电子商城系统
  • 第20篇:Python 开发进阶:使用Django进行Web开发详解
  • rust如何定义全局对象变量
  • 如何成为一名LLM(大语言模型)工程师
  • 基于Flask的哔哩哔哩评论数据可视化分析系统的设计与实现
  • 亲测有效!解决PyCharm下PyEMD安装报错 ModuleNotFoundError: No module named ‘PyEMD‘
  • C++----STL(list)
  • C语言复习
  • 今何在:“思索答案就是一种对虚无的战斗”
  • 基于Springboot + vue实现的民俗网
  • 深度强化学习:PPO