当前位置: 首页 > article >正文

NIO 和 Netty 在 Spring Boot 中的集成与使用

Netty到底是个啥,有啥子作用

1. Netty 的本质:对 NIO 的封装

  • NIO 的原生问题

    • Java 的 NIO 提供了非阻塞 I/O 和多路复用机制,但其使用较为复杂(如 Selector、Channel、Buffer 的配置和管理)。
    • 开发者需要自己处理线程模型、资源管理、协议解析等底层细节,代码冗长且容易出错。
  • Netty 的改进

    • Netty 对 NIO 进行了高级封装,提供了更加易用的 API 和灵活的抽象层,例如:
      • Channel:封装网络连接。
      • Pipeline:处理器链,支持多种 Handler 对 I/O 事件的顺序处理。
      • EventLoopGroup:管理 I/O 线程,优化线程池模型。
      • 零拷贝:通过 FileRegion 等机制提升文件传输性能。
    • 通过这些封装,Netty 让开发者可以专注于业务逻辑,而无需过多关注底层实现。

2. Netty 的定位:搭建各种协议的网络框架

Netty 的核心价值在于它不是一个具体的协议实现,而是一个通用的网络框架,你可以用它来搭建支持任意协议的服务器或客户端:

  • HTTP/HTTPS:Netty 提供了 HttpServerCodecHttpObjectAggregator 等,简化 HTTP 服务器开发。
  • WebSocket:通过 WebSocketServerProtocolHandler 支持 WebSocket 协议的握手和数据帧处理。
  • 自定义协议:可以使用 Netty 提供的 ByteBuf 和编解码器开发任意的私有协议。
  • 其他协议:支持基于 TCP、UDP 的多种网络协议(如 FTP、MQTT、RPC 框架等)。

因此,学习 Netty 的核心目标是:基于其框架搭建一个能够满足你业务需求的服务器或客户端,这既可能是基于已有协议(如 HTTP),也可能是自定义协议。


3. Netty 与 Spring 的关系:与 Tomcat 并列

Netty 和 Spring 中的 Tomcat、Jetty、Undertow 等容器确实属于同一层级的网络层组件,它们的关系可以这样理解:

  • Tomcat/Jetty/Undertow(传统 Servlet 容器)

    • 专注于基于 Servlet API 的 HTTP 协议处理。
    • 与 Spring WebMVC、Spring Boot 配合紧密,默认用于运行 REST API 或 Web 应用。
    • 通常是阻塞 I/O 模型(Tomcat 提供了异步支持,但整体设计仍然偏向阻塞)。
  • Reactor Netty(Spring WebFlux 默认容器)

    • Netty 的一个封装版本,专注于响应式编程模型,支持非阻塞、高并发 HTTP/WebSocket 服务。
    • 与 Spring WebFlux 配合,简化了直接使用 Netty 时的一些底层复杂性。
    • 你不需要自己搭建 ServerBootstrap,但可以通过 WebFlux 直接调用 Netty 的异步能力。
  • 原生 Netty

    • 灵活且强大,可以搭建支持任意协议的高性能服务器或客户端。
    • 如果需要自定义协议(如 RPC、物联网通信协议)、或处理特殊场景(如 TCP 长连接、UDP 消息),原生 Netty 是理想选择。
    • 它可以与 Spring Boot 并行运行,比如 Spring Boot 提供 HTTP API,而 Netty 提供 WebSocket、TCP 或自定义协议服务。

4. 学习 Netty 的本质:搭建服务器(或客户端)

学习 Netty 的核心目标确实是“搭建一个服务器或客户端”。以下是几个关键点:

  • 事件驱动的 I/O 模型

    • 理解 Netty 的 ChannelEventLoopSelector 是如何通过事件驱动模型处理网络 I/O 的。
    • 理解 ChannelPipelineHandler 的作用,以及如何通过责任链处理 I/O 事件(如读、写、异常等)。
  • 线程模型的灵活性

    • 学习如何使用 EventLoopGroup 来管理线程池(如 bossGroup 处理连接,workerGroup 处理读写)。
    • 掌握 Netty 的多线程模型,避免传统阻塞 I/O 中的线程瓶颈。
  • 编解码与协议支持

    • 学习如何使用 Netty 提供的编解码器(如 ByteToMessageDecoderMessageToByteEncoder)来解析和组装协议数据。
    • 通过 HttpServerCodec 等现成工具快速支持标准协议,也可以自己开发定制化的协议栈。
  • 高性能文件传输

    • 利用 Netty 的零拷贝(FileRegion)和分块传输(ChunkedWriteHandler)提升文件传输效率。
    • 在需要时手动控制传输细节,比如断点续传、多连接并行下载等。
  • 典型场景

    • 构建一个支持高并发的 WebSocket 服务器,用于聊天、通知推送等。
    • 构建基于 Netty 的 RPC 框架,支持分布式调用。
    • 使用 Netty 搭建高效的 TCP 或 UDP 服务(如物联网设备通信)。
    • 开发支持复杂协议(如 MQTT、FTP、SMTP 等)的服务器或客户端。

5. 是否每次都需要搭建一个服务器?

不一定完全是“搭建服务器”。Netty 本身既可以用来搭建服务器(ServerBootstrap),也可以用来构建客户端(Bootstrap),具体取决于你的应用场景:

  • 搭建服务器
    这是 Netty 的典型用途,例如构建 HTTP、WebSocket 或其他协议服务器。学习 Netty 的一个重要方向就是学会如何编写 ServerBootstrap 并配置它的 ChannelInitializerPipeline

  • 构建客户端
    Netty 也可以用来构建高性能的网络客户端。例如:

    • RPC 调用中的客户端通信。
    • 基于 Netty 的 WebSocket 客户端(如 IM、实时通知客户端)。
    • 支持自定义协议的客户端程序。

一、NIO 的基础与在 Spring Boot 中的集成

1.1 NIO 是什么?为什么要有 NIO?

NIO 的背景

Java NIO(New Input/Output)是 Java 1.4 引入的一组新的 I/O 操作 API,旨在解决传统 BIO(Blocking I/O)在高并发场景下的性能瓶颈。BIO 的模型在处理大量并发连接时存在明显的性能问题,而 NIO 通过引入非阻塞 I/O 和多路复用技术,显著提升了 I/O 操作的效率。

BIO 的局限性:

  • 线程开销大:每个连接需要一个独立的线程进行处理,当连接数增加时,线程数量会急剧增加,导致系统资源消耗过大。

  • 阻塞操作:在 BIO 模型中,I/O 操作是阻塞的,即线程在执行读写操作时会一直等待,直到数据准备好或操作完成。这种阻塞会导致大量线程处于等待状态,浪费 CPU 资源。

  • 扩展性差:由于线程数量的限制,BIO 模型难以应对高并发的场景,尤其是在网络通信中,连接数可能达到数千甚至数万。

NIO 的特点与优势

  • 非阻塞 I/O:NIO 的核心特性之一是非阻塞 I/O。通过 Selector 机制,NIO 可以实现事件驱动的 I/O 操作。一个线程可以管理多个通道(Channel),只有在通道上有事件(如数据可读、可写)发生时,线程才会进行处理,避免了线程的阻塞等待。

  • 多路复用:NIO 使用 Selector 实现多路复用,即一个线程可以同时监控多个通道的 I/O 事件。这种方式大大减少了线程的数量,降低了系统的资源消耗,同时提高了系统的并发处理能力。

  • 缓冲区模型:NIO 引入了 Buffer 作为数据读写的核心组件。与传统的流式 I/O 不同,NIO 使用缓冲区来批量处理数据,减少了频繁的系统调用,提高了 I/O 操作的效率。

1.2 NIO 的基础组件、它们之间的联系与工作流程

NIO 的核心组件包括 Channel(通道)Buffer(缓冲区) 和 Selector(选择器)。这些组件共同协作,实现了非阻塞 I/O 和多路复用机制。下面我们将详细讲解这些组件的作用、联系以及它们的工作流程。


1.2.1 Channel(通道)

Channel 是 NIO 中用于数据传输的抽象,类似于传统 I/O 中的流(Stream),但 Channel 是双向的,既可以读数据,也可以写数据。与传统的流相比,Channel 提供了更高的灵活性和性能。

1.2.1.1 对比传统流

  • 传统流:传统的 InputStream 和 OutputStream 是单向的,只能读或写。

  • Channel:Channel 是双向的,既可以读数据,也可以写数据。例如,SocketChannel 可以同时进行读写操作。

1.2.1.2 常见类型

  • ServerSocketChannel:用于监听 TCP 连接,类似于 ServerSocket

  • SocketChannel:用于 TCP 网络通信,类似于 Socket

  • FileChannel:用于文件 I/O 操作,支持文件的读写和内存映射。

1.2.1.3 获取 Channel

  • 通过 open() 方法获取 Channel:

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    SocketChannel socketChannel = SocketChannel.open();
    FileChannel fileChannel = FileChannel.open(Paths.get("file.txt"), StandardOpenOption.READ);
  • 通过 RandomAccessFile 获取 Channel:

    RandomAccessFile file = new RandomAccessFile("file.txt", "rw");
    FileChannel fileChannel = file.getChannel();

1.2.2 Buffer(缓冲区)

Buffer 是 NIO 中用于存储数据的容器,所有的数据读写操作都是通过 Buffer 进行的。Buffer 提供了一种高效的方式来批量处理数据,避免了传统流式 I/O 的低效问题。

1.2.2.1 核心类:ByteBuffer

  • ByteBuffer 是最常用的缓冲区类型,用于存储字节数据。

  • 其他类型的 Buffer:

    • CharBuffer:存储字符数据。

    • IntBuffer:存储整数数据。

    • LongBuffer:存储长整数数据。

1.2.2.2 读写数据必须经过 Buffer

  • 写入数据:将数据写入 Buffer,然后通过 Channel 发送。

  • 读取数据:从 Channel 读取数据到 Buffer,然后从 Buffer 中读取数据。

1.2.2.3 核心方法

  • put():将数据写入 Buffer。

  • get():从 Buffer 中读取数据。

  • flip():将 Buffer 从写模式切换到读模式。

  • clear():清空 Buffer,准备重新写入数据。

  • rewind():将 position 重置为 0,可以重新读取 Buffer 中的数据。

1.2.2.4 Buffer 的状态属性

  • capacity:缓冲区的容量,即可以存储的最大数据量。

  • position:当前读写的位置。

  • limit:缓冲区的限制位置,表示可以读写的数据范围。

  • mark:标记位置,用于临时记录 position。


1.2.3 Selector(选择器)

Selector 是 NIO 实现多路复用的核心组件,它可以监控多个 Channel 的 I/O 事件(如读、写、连接等)。通过 Selector,一个线程可以管理多个 Channel,从而减少线程数量,提高系统的并发处理能力。

1.2.3.1 基于操作系统的多路复用机制

  • Selector 底层依赖于操作系统的多路复用机制(如 Linux 的 epoll、Windows 的 IOCP)。

  • 通过 select() 方法,Selector 可以查询哪些 Channel 已经就绪(如可读、可写、可连接等)。

1.2.3.2 事件类型

  • OP_ACCEPT:表示 Channel 可以接受新的连接(适用于 ServerSocketChannel)。

  • OP_READ:表示 Channel 有数据可读。

  • OP_WRITE:表示 Channel 可以写入数据。

  • OP_CONNECT:表示 Channel 已经成功连接。

1.2.3.3 注册 Channel 到 Selector

  • 通过 register() 方法将 Channel 注册到 Selector,并指定感兴趣的事件:

    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    socketChannel.register(selector, SelectionKey.OP_READ);

1.2.4 工作流程简述

NIO 的工作流程可以分为以下几个步骤:

1.2.4.1 初始化

打开 ServerSocketChannel,并配置为非阻塞模式:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);

打开 Selector

Selector selector = Selector.open();

将 ServerSocketChannel 注册到 Selector,并指定感兴趣的事件(如 OP_ACCEPT):

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

1.2.4.2 事件循环

调用 selector.select() 方法,阻塞等待事件发生,轮询就绪事件:

selector.select();

获取就绪的 SelectionKey 集合:

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();

1.2.4.3 事件处理

处理新连接(OP_ACCEPT)

  • 调用 accept() 方法接受新连接,并注册到 Selector:

    if (key.isAcceptable()) {
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

处理读事件(OP_READ)

  • 从 Channel 读取数据到 Buffer:

    if (key.isReadable()) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int bytesRead = socketChannel.read(buffer);
        if (bytesRead > 0) {
            buffer.flip();
            // 处理读取的数据
        }
    }

处理写事件(OP_WRITE)

  • 将数据从 Buffer 写入 Channel:

    if (key.isWritable()) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.wrap("Hello, World!".getBytes());
        socketChannel.write(buffer);
    }

1.2.4.4 Buffer 读写

  • 读操作:将数据从 Channel 读入 Buffer。

  • 写操作:将数据从 Buffer 写回 Channel。


1.2.5 组件之间的联系

  • Channel 与 Buffer:Channel 负责数据的传输,Buffer 负责数据的存储。所有的读写操作都必须通过 Buffer 进行。

  • Channel 与 Selector:Channel 注册到 Selector 后,Selector 可以监控 Channel 的 I/O 事件,并在事件发生时通知应用程序。

  • Selector 与 Buffer:Selector 负责管理多个 Channel 的事件,而 Buffer 负责存储这些事件中涉及的数据。

1.3 NIO 在 Spring Boot 中的集成与使用

在 Spring Boot 中集成 NIO 可以帮助我们构建高性能的网络服务和文件操作功能。下面我们将详细讲解如何在 Spring Boot 中编写简易的 NIO 服务,以及如何使用 NIO 进行文件操作,并结合实际项目场景进行代码实现和详细解释。


1.3.1 在 Spring Boot 中编写简易 NIO 服务

1.3.1.1 目标

  • 实现一个基于 NIO 的简易 Echo 服务器。

  • 将 NIO 服务集成到 Spring Boot 项目中。

  • 使用线程池管理 NIO 事件循环,避免阻塞 Spring Boot 主线程。

1.3.1.2 实现步骤

  • 创建 Spring Boot 项目

    • 使用 Spring Initializr 创建一个 Spring Boot 项目,添加 spring-boot-starter-web 依赖。

  • 编写 NIO 服务

    • 在 Spring Boot 中编写 NIO 服务,并将其作为一个 @Component 或 @Service 组件。

  • 启动 NIO 服务

    • 在 Spring Boot 启动时初始化 NIO 服务。

  • 测试 NIO 服务

    • 使用 Telnet 或 Netcat 工具测试 NIO 服务。


1.3.1.3 关键代码与详细解释

以下是一个完整的 Spring Boot 项目示例,展示了如何集成 NIO 服务。

1. Spring Boot 主类

@SpringBootApplication
public class NioSpringBootApplication {
    public static void main(String[] args) {
        SpringApplication.run(NioSpringBootApplication.class, args);
    }
}
  • 说明:这是 Spring Boot 的启动类,用于启动应用程序。


2. NIO 服务组件

import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class NioEchoServer {

    @PostConstruct
    public void start() throws IOException {
        // 1. 创建 ServerSocketChannel 并绑定端口
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080)); // 绑定到 8080 端口
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式

        // 2. 创建 Selector
        Selector selector = Selector.open();

        // 3. 将 ServerSocketChannel 注册到 Selector,监听 OP_ACCEPT 事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 4. 使用线程池管理事件循环
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            try {
                while (true) {
                    // 5. 阻塞等待事件发生
                    selector.select();

                    // 6. 获取就绪的事件集合
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectedKeys.iterator();

                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();

                        // 7. 处理新连接事件
                        if (key.isAcceptable()) {
                            handleAccept(serverSocketChannel, selector);
                        }

                        // 8. 处理读事件
                        if (key.isReadable()) {
                            handleRead(key);
                        }

                        // 9. 移除已处理的事件
                        iter.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    // 处理新连接事件
    private void handleAccept(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
        SocketChannel socketChannel = serverSocketChannel.accept(); // 接受新连接
        socketChannel.configureBlocking(false); // 设置为非阻塞模式
        socketChannel.register(selector, SelectionKey.OP_READ); // 注册到 Selector,监听 OP_READ 事件
        System.out.println("New client connected: " + socketChannel.getRemoteAddress());
    }

    // 处理读事件
    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配缓冲区

        // 读取数据
        int bytesRead = socketChannel.read(buffer);
        if (bytesRead > 0) {
            buffer.flip(); // 切换为读模式
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data); // 将数据从缓冲区读取到字节数组
            String message = new String(data);
            System.out.println("Received: " + message);

            // 回显数据
            ByteBuffer responseBuffer = ByteBuffer.wrap(data);
            socketChannel.write(responseBuffer); // 将数据写回客户端
        } else if (bytesRead == -1) {
            // 客户端关闭连接
            System.out.println("Client disconnected: " + socketChannel.getRemoteAddress());
            socketChannel.close();
        }
    }
}

1.3.1.4 关键代码详解

  • ServerSocketChannel 的创建与配置

    • ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();:创建一个 ServerSocketChannel

    • serverSocketChannel.bind(new InetSocketAddress(8080));:绑定到 8080 端口。

    • serverSocketChannel.configureBlocking(false);:设置为非阻塞模式。

  • Selector 的创建与注册

    • Selector selector = Selector.open();:创建一个 Selector

    • serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);:将 ServerSocketChannel 注册到 Selector,监听 OP_ACCEPT 事件。

  • 事件循环

    • selector.select();:阻塞等待事件发生。

    • Set<SelectionKey> selectedKeys = selector.selectedKeys();:获取就绪的事件集合。

    • Iterator<SelectionKey> iter = selectedKeys.iterator();:遍历事件集合。

  • 处理新连接事件

    • SocketChannel socketChannel = serverSocketChannel.accept();:接受新连接。

    • socketChannel.configureBlocking(false);:设置为非阻塞模式。

    • socketChannel.register(selector, SelectionKey.OP_READ);:注册到 Selector,监听 OP_READ 事件。

  • 处理读事件

    • ByteBuffer buffer = ByteBuffer.allocate(1024);:分配缓冲区。

    • int bytesRead = socketChannel.read(buffer);:读取数据到缓冲区。

    • buffer.flip();:切换为读模式。

    • socketChannel.write(responseBuffer);:将数据写回客户端。

  • 线程池管理

    • ExecutorService executorService = Executors.newSingleThreadExecutor();:创建一个单线程的线程池。

    • executorService.submit(() -> { ... });:提交任务,避免阻塞 Spring Boot 主线程。


1.3.1.5 测试 NIO 服务

  1. 启动 Spring Boot 项目。

  2. 使用 Telnet 或 Netcat 连接到 NIO 服务器:

    telnet localhost 8080
  3. 输入消息,服务器会将消息原样返回。


1.3.2 NIO 文件操作

1.3.2.1 目标

  • 使用 FileChannel 和 MappedByteBuffer 实现高效的文件读写。

  • 在 Spring Boot 中提供文件上传和下载接口。

1.3.2.2 实现步骤

  • 使用 FileChannel 读写文件

    • 通过 FileChannel 实现文件的随机访问和直接写入。

  • 使用 MappedByteBuffer 提高读写效率

    • 将文件映射到内存中,减少系统调用。

  • 在 Spring Boot 中提供文件上传和下载接口

    • 使用 @RestController 实现文件上传和下载功能。


1.3.2.3 关键代码与详细解释

以下是一个完整的 Spring Boot 项目示例,展示了如何使用 NIO 进行文件操作。

1. 文件上传接口

import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

@RestController
@RequestMapping("/file")
public class FileController {

    @PostMapping("/upload")
    public String uploadFile(@RequestParam("file") MultipartFile file) throws IOException {
        // 1. 获取文件路径
        String filePath = "uploads/" + file.getOriginalFilename();

        // 2. 使用 FileChannel 写入文件
        try (FileChannel fileChannel = FileChannel.open(Paths.get(filePath), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
            ByteBuffer buffer = ByteBuffer.wrap(file.getBytes()); // 将文件内容写入缓冲区
            fileChannel.write(buffer); // 将缓冲区内容写入文件
        }

        return "File uploaded: " + file.getOriginalFilename();
    }
}
  • 说明

    • MultipartFile file:接收上传的文件。

    • FileChannel.open():打开文件通道。

    • ByteBuffer.wrap():将文件内容包装到缓冲区。

    • fileChannel.write():将缓冲区内容写入文件。


2. 文件下载接口

import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.file.Paths;

@RestController
@RequestMapping("/file")
public class FileController {

    @GetMapping("/download")
    public ResponseEntity<Resource> downloadFile(@RequestParam String filename) {
        // 1. 获取文件路径
        String filePath = "uploads/" + filename;

        // 2. 创建 FileSystemResource
        FileSystemResource resource = new FileSystemResource(Paths.get(filePath));

        // 3. 返回文件内容
        return ResponseEntity.ok()
                .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
                .body(resource);
    }
}
  • 说明

    • FileSystemResource:表示文件系统资源。

    • ResponseEntity:返回文件内容,并设置响应头。


3. 使用 MappedByteBuffer 提高读写效率

import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class FileService {

    public void readFileWithMappedBuffer(String filePath) throws IOException {
        try (FileChannel fileChannel = FileChannel.open(Paths.get(filePath), StandardOpenOption.READ)) {
            // 1. 将文件映射到内存
            MappedByteBuffer mappedBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

            // 2. 读取文件内容
            byte[] data = new byte[mappedBuffer.remaining()];
            mappedBuffer.get(data);
            System.out.println(new String(data));
        }
    }
}
  • 说明

    • fileChannel.map():将文件映射到内存中。

    • MappedByteBuffer:直接操作内存中的数据,提高读写效率。

2. Netty 基础

2.1 Netty 是什么?与 NIO 之间的联系

2.1.1 Netty 的定义

Netty 是一个基于 Java NIO 的异步事件驱动网络应用框架,专注于高性能和可扩展的网络通信应用开发。它封装了底层 NIO 的复杂性,使开发者无需直接处理 SelectorChannelBuffer 等低级 API,从而简化网络通信的实现。

Netty 的应用场景广泛,适用于:

  • 实现自定义 TCP/UDP 协议服务;
  • 开发高性能的 HTTP 服务器和客户端;
  • 构建分布式系统的通信模块。

2.1.2 Netty 与 NIO 的关系

Netty 基于 NIO,继承了其核心特性,同时进一步简化和优化了编程模型,提供了更加易用且功能强大的 API。

NIO 特性Netty 封装
SelectorEventLoopGroup:管理多路复用的事件循环线程组
ChannelChannel:扩展的网络通道接口,支持异步操作
BufferByteBuf:替代 ByteBuffer,提供更高效和灵活的内存管理
I/O 事件处理PipelineChannelHandler:链式事件处理机制

2.1.3 Netty 的主要优点

  • 简洁性

    • 封装底层 API:隐藏了复杂的 SelectorChannel 细节,开发者无需手动管理 I/O 事件。
    • 链式处理模型:通过 PipelineChannelHandler 管理 I/O 数据流。
  • 可扩展性

    • 支持多种协议:内置 HTTP、WebSocket 等常用协议的编解码器。
    • 丰富的工具库:支持自定义协议解析和扩展。
  • 高性能

    • 线程模型优化:采用 EventLoopGroup 管理线程池,实现事件的高效分发。
    • 零拷贝:使用 ByteBuf 支持直接内存操作,避免数据在用户态与内核态之间多次复制。
    • 异步非阻塞:充分利用底层 NIO 的非阻塞特性,实现高效的 I/O 操作。

2.2 Netty 中的核心组件,以及它们之间的联系、工作流程


  1. 当服务端初始化时,需要创建并配置 EventLoopGroup(事件循环组)ServerBootstrap(服务端引导类) 等关键对象。
  2. 客户端或服务端底层通信的抽象是 Channel(通道);每个 Channel 上会挂载一个 ChannelPipeline(通道管线),其中包含多个 ChannelHandler(通道处理器) 对入站/出站数据进行处理。
  3. 数据的读写操作基于 ByteBuf(字节缓冲区) 提供更灵活高效的内存管理。
  4. 当服务器运行时,EventLoopGroup 中的线程会不断监听网络事件(连接、读、写等),并按顺序调用对应的 Handler 进行处理。
  5. 在关闭阶段,需要优雅地释放线程组和通道资源。

2.2.1 EventLoopGroup(事件循环组)

作用:管理线程池并负责处理 Channel 上的各种 I/O 事件,如 连接读写 等。

  • EventLoopGroup 的作用

    • 线程管理:为 Netty 提供线程池支持,使得多个连接可以在少量线程之间复用(非阻塞 I/O 的核心思想)。
    • 事件循环:每个 EventLoop 内部通常持有一个 Selector(在 NIO 模型下),不断轮询注册在此上的 Channel 事件(如可读、可写、连接就绪等)。
    • 调度任务:除了 I/O 事件外,EventLoopGroup 也可执行定时任务或用户自定义任务。
  • EventLoopGroup 的分类

    • Boss Group(老板线程组):主要负责处理客户端的连接请求(OP_ACCEPT)。
    • Worker Group(工人线程组):负责处理已经建立连接的读写操作(OP_READ / OP_WRITE)。
    • 不同的 EventLoopGroup 可以基于不同的实现,如 NioEventLoopGroup(基于 NIO),EpollEventLoopGroup(基于 epoll,仅在 Linux 上可用),等等。
  • EventLoopGroup 的工作流程

    • 初始化:创建 NioEventLoopGroupEpollEventLoopGroup 时,会生成对应数量的 EventLoop 线程。
    • 事件循环:这些线程会在循环中不断调用 select()poll() 方法检测 I/O 就绪事件,并回调相应的 Handler。
    • 任务执行:用户可以通过 EventLoop 提交一些定时或普通任务,这些任务会在事件循环线程中被执行。
    • 优雅关闭:在关闭阶段,调用 shutdownGracefully() 来结束事件循环并释放资源。

2.2.2 ServerBootstrap / Bootstrap(服务端引导类/客户端引导类)

作用:配置并启动 Netty 应用程序的核心入口。

1. ServerBootstrap(服务端引导类)

用于 服务端 的配置与启动。例如:

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置 Boss 和 Worker 线程组
         .channel(NioServerSocketChannel.class) // 设置服务端Channel类型
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
                 ch.pipeline().addLast(new MyHandler()); // 添加自定义Handler
             }
         });

主要功能

  • 绑定端口bind(port))并监听客户端连接;
  • 关联线程组group(...))让 BossGroup 处理连接,WorkerGroup 处理读写;
  • 指定 Channel 类型(如 NioServerSocketChannel);
  • childHandler:对每个新建的 SocketChannel 进行初始化(设置 pipeline、handler 等)。

2. Bootstrap(客户端引导类)

用于 客户端 的配置与启动。例如:

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup) // 设置 EventLoopGroup
         .channel(NioSocketChannel.class) // 设置客户端 Channel类型
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
                 ch.pipeline().addLast(new MyHandler());
             }
         });

主要功能

  • 通过 connect(host, port) 发起连接到服务端;
  • ServerBootstrap 类似,也可配置线程组、选择 Channel 实现、指定初始化 pipeline 等。

2.2.3 Channel(通道)

作用:代表一个 网络连接 的抽象,封装了底层 读写操作远程信息 等。

  • Channel 的作用

    • 网络 I/O 操作:如 read()write()connect()bind() 等;
    • 生命周期管理:包括活跃、非活跃、注册到 EventLoop、关闭等;
    • Pipeline 挂载:每个 Channel 都拥有一个 ChannelPipeline 来处理 I/O 事件。
  • Channel 的类型

    • NioServerSocketChannel:服务端监听套接字,基于 NIO。
    • NioSocketChannel:客户端或服务器端使用的普通 TCP 连接通道。
    • EmbeddedChannel:内嵌测试用的通道,主要用于单元测试。
    • EpollServerSocketChannel / EpollSocketChannel:在 Linux 上基于 epoll 实现。
  • Channel 的工作流程

    • 当服务端启动后,会创建一个 ServerSocketChannelbossGroup 监听连接;
    • 客户端连接成功后,会对应创建一个 SocketChannel,注册到 workerGroup 中,用于读写数据;
    • 应用层可以通过 channel.writeAndFlush(msg) 向远端发送数据,也可以在入站事件中读取数据;
    • 当连接断开或调用关闭方法时,Channel 进入非活跃状态并最终释放资源。

2.2.4 ChannelPipeline & ChannelHandler(通道管线 & 通道处理器)

作用:构成对 I/O 事件进行处理的 责任链 机制。

  • ChannelPipeline(通道管线)

    • 中文名称:通道管线
    • 作用:维护一组 ChannelHandler 对象的有序列表,所有入站与出站事件都会依次经过这些 Handler。
    • 特点:双向链表结构,提供 addLast(), remove(), replace() 等方法。
  • ChannelHandler(通道处理器)

    • 中文名称:通道处理器
    • 作用:实际编写业务逻辑的地方;区分 入站 (Inbound) 和 出站 (Outbound) 两种处理器。
      • InboundHandler:处理读事件、连接事件、异常事件等入站操作;
      • OutboundHandler:处理写事件、刷新事件等出站操作。

Pipeline 的工作流程

  1. 入站数据:从 Channel 读取数据 -> 按顺序调用 InboundHandler(如解码器、业务处理等)。
  2. 出站数据:调用 write() -> 按逆序调用 OutboundHandler(如编码器、压缩等)-> 最终写到远端。
  3. Handler 调用:Netty 提供了 ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 等适配器供继承,开发者可重写想要的方法(如 channelRead, channelActive, write, flush 等)。

2.2.5 ByteBuf(字节缓冲区)

作用:在 Netty 中用于读写数据的 核心缓存,比 Java NIO 的 ByteBuffer 更高效灵活。

  • ByteBuf 的特点

    • 读写指针:不需要像 ByteBuffer 一样手动 flip();ByteBuf 内部有 readerIndex、writerIndex 来标识读写位置。
    • 容量可扩展:可以自动扩容,无需手动分配过多内存。
    • 池化:Netty 提供了池化机制,减少内存分配与回收的开销。
    • 多种实现:如堆内存 (heap buffer) 与直接内存 (direct buffer)。
  • ByteBuf 的使用

    ByteBuf buffer = Unpooled.buffer(1024);         // 创建一个 1024 字节的缓冲区
    buffer.writeBytes("Hello, Netty!".getBytes());  // 写入数据
    byte[] data = new byte[buffer.readableBytes()];
    buffer.readBytes(data);                         // 读取数据
    System.out.println(new String(data));
    
    • 写数据buffer.writeBytes(...)writeInt(), writeLong() 等;
    • 读数据buffer.readBytes(...)readInt(), readLong() 等;
    • 索引操作getByte(int index) / setByte(int index) 等,不会移动 readerIndex / writerIndex。

2.2.6 Netty 工作流程

下面综合以上组件,介绍 Netty 典型的服务器工作流程,一般可分为 初始化启动事件处理关闭 四大阶段。


1. 初始化阶段

创建 EventLoopGroup(事件循环组)

EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
EventLoopGroup workerGroup = new NioEventLoopGroup();
  • bossGroup 通常只需一个线程负责处理连接;
  • workerGroup 默认线程数为 CPU 核心数 * 2,处理实际的读写 I/O。

创建并配置 ServerBootstrap(服务端引导类)

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置 EventLoopGroup
         .channel(NioServerSocketChannel.class) // 设置 Channel 类型
         .childHandler(new ChannelInitializer<SocketChannel>() { // 设置 ChannelHandler
             @Override
             protected void initChannel(SocketChannel ch) {
                 ch.pipeline().addLast(new MyHandler());
             }
         });
  • group(...):将 Boss/Worker 线程组与引导类关联;
  • channel(...):指定服务端 Channel 实现类型(NIO / EPOLL等);
  • childHandler(...):当新的客户端连接创建 SocketChannel 时,如何对 pipeline 进行初始化(添加业务 Handler)。

2. 启动阶段
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Netty Server started on port 8080");
  • 通过 bind(8080) 绑定端口,sync() 阻塞等待绑定完成;
  • 开始监听客户端连接请求:当客户端连接到 8080 端口时,bossGroup 会创建新的 SocketChannel 并交给 workerGroup 管理。

3. 事件处理阶段

在运行过程中,Netty 通过 事件循环 来处理 I/O 事件(连接事件、读事件、写事件、异常事件等)。

处理连接事件

public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Client connected: " + ctx.channel().remoteAddress());
    }
}
  • channelActive 被触发时,表示客户端连接建立成功。

处理读写事件

public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer = (ByteBuf) msg;
        byte[] data = new byte[buffer.readableBytes()];
        buffer.readBytes(data);
        System.out.println("Received: " + new String(data));

        // 回显给客户端
        ctx.writeAndFlush(Unpooled.wrappedBuffer("Echo: ".getBytes(), data));
    }
}
  • 读事件:从 ByteBuf 中读取数据;
  • 写事件:使用 ctx.writeAndFlush(...) 将数据写回远端;
  • Echo:此例子直接回显客户端发送的数据。

Pipeline 的工作流程

  • 入站(Inbound):如上 channelRead,会从 pipeline 的头部开始依次调用入站 Handler;
  • 出站(Outbound):如 writeAndFlush,会反向调用出站 Handler,直至将数据写到网络中。

4. 关闭阶段

当服务器需要停止时,需要释放资源、关闭 EventLoopGroup

关闭服务器

future.channel().closeFuture().sync();
  • 等待服务器的 Channel 关闭事件完成后再继续执行。

释放资源

bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
  • 优雅地关闭线程组,确保已处理完正在进行的 I/O 事件。

完整示例代码

以下是一个完整的 Netty 服务器示例,展示了 Netty 的工作流程:

java

复制

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyEchoServer {

    public static void main(String[] args) throws InterruptedException {
        // 1. 创建 EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 负责接收连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 负责处理连接

        try {
            // 2. 创建 ServerBootstrap
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class) // 设置 Channel 类型
                     .childHandler(new ChannelInitializer<SocketChannel>() { // 设置 ChannelHandler
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             pipeline.addLast(new StringDecoder()); // 解码器
                             pipeline.addLast(new StringEncoder()); // 编码器
                             pipeline.addLast(new EchoServerHandler()); // 业务处理器
                         }
                     });

            // 3. 绑定端口并启动服务器
            ChannelFuture future = bootstrap.bind(8080).sync();
            System.out.println("Netty Echo Server started on port 8080");

            // 4. 等待服务器关闭
            future.channel().closeFuture().sync();
        } finally {
            // 5. 关闭 EventLoopGroup
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // Echo 服务器处理器
    private static class EchoServerHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            // 回显消息
            ctx.writeAndFlush("Echo: " + msg);
        }
    }
}

三、Spring Boot 集成 Netty

3.1 方式一:替换默认的 Servlet 容器

Spring Boot 默认使用 Tomcat 作为内嵌的 Servlet 容器。如果我们希望使用 Netty 作为服务器,可以通过以下步骤替换默认的 Tomcat。

3.1.1 移除 Tomcat 依赖

在 Spring Boot 项目中,默认引入了 spring-boot-starter-web,它依赖 Tomcat。为了替换 Tomcat,我们需要移除 Tomcat 依赖。

方法 1:排除 Tomcat 依赖

在 pom.xml 中排除 spring-boot-starter-tomcat

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </exclusion>
    </exclusions>
</dependency>

方法 2:使用 spring-boot-starter-webflux

  • spring-boot-starter-webflux 默认使用 Reactor Netty 作为服务器,无需手动排除 Tomcat:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

3.1.2 添加 Netty 依赖

如果选择使用 spring-boot-starter-webflux,则无需手动添加 Netty 依赖,因为它已经包含了 Reactor Netty。

如果选择手动集成 Netty,可以添加以下依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.68.Final</version>
</dependency>

3.1.3 使用 Spring WebFlux (Reactor Netty) 启动 HTTP 服务器

Spring WebFlux 是 Spring 5 引入的响应式编程框架,默认使用 Reactor Netty 作为服务器。通过 WebFlux,我们可以轻松构建非阻塞的 HTTP/REST 服务。

1. 编写 @RestController

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
public class HelloController {

    @GetMapping("/hello")
    public Mono<String> hello(@RequestParam String name) {
        return Mono.just("Hello, " + name + "!");
    }
}

2. 启动应用

  • Spring Boot 会自动使用 Reactor Netty 作为服务器。

  • 访问 http://localhost:8080/hello?name=World,将会返回 Hello, World!

3. 函数式路由

  • 除了 @RestController,WebFlux 还支持函数式路由。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class RouterConfig {

    @Bean
    public RouterFunction<ServerResponse> helloRoute() {
        return route(GET("/hello"), request -> {
            String name = request.queryParam("name").orElse("World");
            return ServerResponse.ok().bodyValue("Hello, " + name + "!");
        });
    }
}

3.1.4 适用场景

1. 不想直接维护 Netty 原生的 ServerBootstrap

  • 使用 Spring WebFlux 可以避免直接操作 Netty 的底层 API,简化开发。

2. 主要处理 HTTP/REST 请求,追求非阻塞高性能

  • WebFlux 提供了响应式编程模型,适合处理高并发的 HTTP/REST 请求。

3.2 方式二:单独运行 Netty 服务器(TCP/UDP/WebSocket)

在某些场景下,我们可能需要在 Spring Boot 应用中同时运行一个独立的 Netty 服务器,用于处理自定义协议(如 TCP、UDP、WebSocket 等)。这种方式可以保留 Spring Boot 默认的 Tomcat/Jetty 处理 HTTP 请求,同时利用 Netty 处理长连接或自定义协议。

3.2.1 与 Spring Boot 核心应用同处一个进程

在这种方式下,Netty 服务器与 Spring Boot 应用运行在同一个 JVM 进程中。我们可以将 Netty 服务器作为一个 Spring Bean 进行初始化,并通过 Spring 的依赖注入机制与其他组件(如 Service、Repository)交互。

1. 保留默认的 Tomcat/Jetty

  • Spring Boot 默认使用 Tomcat 或 Jetty 处理 HTTP 请求,这部分保持不变。

  • Netty 服务器用于处理自定义协议或长连接(如 WebSocket、TCP、UDP)。

2. 将 Netty Server 作为一个 Spring Bean

  • 在 @Configuration 类中初始化 Netty 服务器。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NettyServerConfig {

    @Bean
    public EventLoopGroup bossGroup() {
        return new NioEventLoopGroup(1); // 负责接收连接
    }

    @Bean
    public EventLoopGroup workerGroup() {
        return new NioEventLoopGroup(); // 负责处理连接
    }

    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup(), workerGroup())
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel ch) {
                         // 配置 ChannelPipeline
                         ch.pipeline().addLast(new MyHandler());
                     }
                 });
        return bootstrap;
    }
}

3.2.2 启动顺序

Netty 服务器的启动需要在 Spring Boot 应用完全初始化之后进行,以确保 Spring 的 Bean 已经准备好。

1. 在 SpringApplication.run() 之后启动

  • 使用 ApplicationRunner 或 CommandLineRunner 接口启动 Netty 服务器。

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelFuture;
import javax.annotation.Resource;

@Component
public class NettyServerRunner implements ApplicationRunner {

    @Resource
    private ServerBootstrap serverBootstrap;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        ChannelFuture future = serverBootstrap.bind(8081).sync();
        System.out.println("Netty Server started on port 8081");
        future.channel().closeFuture().sync();
    }
}

2. 确保 Spring Bean 已初始化

  • 在 Netty 的 Handler 中可以通过依赖注入使用 Spring 的 Bean。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;

@Component
public class MyHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        // 处理消息
        System.out.println("Received: " + msg);
        ctx.writeAndFlush("Echo: " + msg);
    }
}

3.2.3 适用场景

1. 聊天、游戏、物联网等长连接服务

  • Netty 非常适合处理长连接场景,如即时通讯、游戏服务器、物联网设备通信等。

2. 自定义 TCP/UDP/WebSocket 协议

  • 如果需要处理自定义协议,Netty 提供了灵活的 API 来实现协议编解码。


3.2.4 具体使用实例:简易聊天室

下面我们通过一个 简易聊天室 的实例,展示如何使用 Netty 实现 WebSocket 服务器,并与 Spring Boot 集成。

1. WebSocket 基础

  • WebSocket 是一种全双工通信协议,适用于即时消息、聊天室、推送通知等场景。

  • Netty 提供了对 WebSocket 的支持,可以通过 WebSocketServerProtocolHandler 快速实现 WebSocket 服务器。

2. Netty WebSocket Pipeline 配置

  • 配置 HttpServerCodecHttpObjectAggregator 和 WebSocketServerProtocolHandler

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new HttpServerCodec()); // HTTP 编解码器
        ch.pipeline().addLast(new HttpObjectAggregator(65536)); // 聚合 HTTP 请求
        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws")); // WebSocket 协议处理器
        ch.pipeline().addLast(new ChatHandler()); // 自定义处理器
    }
}

3. ChatHandler 实现

  • 处理 WebSocket 连接、消息接收和广播。

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        Channel incoming = ctx.channel();
        channels.add(incoming);
        System.out.println("Client connected: " + incoming.remoteAddress());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        Channel incoming = ctx.channel();
        channels.remove(incoming);
        System.out.println("Client disconnected: " + incoming.remoteAddress());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
        String message = msg.text();
        System.out.println("Received: " + message);
        // 广播消息
        channels.writeAndFlush(new TextWebSocketFrame("Broadcast: " + message));
    }
}

4. 与 Spring Boot 集成

  • 在 Spring Boot 中初始化 Netty WebSocket 服务器。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WebSocketServerConfig {

    @Bean
    public EventLoopGroup bossGroup() {
        return new NioEventLoopGroup(1);
    }

    @Bean
    public EventLoopGroup workerGroup() {
        return new NioEventLoopGroup();
    }

    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup(), workerGroup())
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new WebSocketServerInitializer());
        return bootstrap;
    }
}

5. 启动 Netty WebSocket 服务器

  • 使用 ApplicationRunner 启动 Netty 服务器。

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelFuture;
import javax.annotation.Resource;

@Component
public class WebSocketServerRunner implements ApplicationRunner {

    @Resource
    private ServerBootstrap serverBootstrap;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        ChannelFuture future = serverBootstrap.bind(8081).sync();
        System.out.println("WebSocket Server started on port 8081");
        future.channel().closeFuture().sync();
    }
}

3.3 方式三:Spring Boot + Netty 文件服务

注意!

  • 以下方案中所谓的方式三其实也是类似于方式二,也是单独搭建了个服务器,只不过这里搭建的这个Netty服务器与上面的Netty服务器的内部组件不一样。
  • 不需要排除 Tomcat;让 Spring Boot 正常启动 Tomcat 处理 server.port=8080 上的 REST、网页、静态资源等。同时,你手动初始化一个 Netty 服务器监听 另一 端口(如 9090),处理你想交给 Netty 的 HTTP 任务(这里示例了文件下载)。
  • 其实完全可以通过使用Spring WebFlux替代Spring MVC来去享受Netty的异步与高并发特性,因为Spring WebFlux的底层就是Reactor Netty(又对Netty做了封装),这种方式可以不用自己去搭建Netty服务器,更加方便。

3.3.1. 为什么要让 Tomcat 与 Netty 共存?

  • 保留 Spring MVC 的便利

    • Spring Boot 默认整合 Tomcat,使用 @RestController@RequestMapping 等注解式模式非常简单高效。
    • 提供常规 Web API、静态资源、过滤器等功能也不需要额外配置。
  • 同时享受 Netty 的高性能/高灵活

    • Netty 对高并发、大文件、WebSocket、TCP/UDP、自定义协议等场景具有强大支持。
    • 某些场景想用原生 Netty 来深度定制数据传输或协议处理,Tomcat 不方便做这一块。
  • 不同端口分工

    • Tomcat:以默认的 8080(或其他)端口运行,处理常规请求。
    • Netty:以 9090(或其他)端口运行,用于高并发文件下载、WebSocket 聊天、RPC 等需求。

结论:二者并存可以让你 在一个项目 中统一管理依赖与配置,既不失去 Tomcat 的开发便利,也能引入 Netty 做特定高性能/自定义场景。


3.3.2. 项目依赖与结构

Maven 依赖

<dependencies>
    <!-- Spring Boot Web Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <!-- 注意:不排除Tomcat -->
    </dependency>

    <!-- Netty 核心依赖 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.68.Final</version>
    </dependency>
</dependencies>
  • spring-boot-starter-web:Spring MVC + 内嵌 Tomcat。
  • netty-all:引入 Netty 的全部组件,方便示例;生产可精简。

目录示例

src/main/java
└── com.example.demo
    ├── DemoApplication.java                   # Spring Boot 启动入口
    ├── controller
    │    └── HelloController.java              # Tomcat 默认处理
    ├── netty
    │    ├── NettyHttpProperties.java          # Netty的配置读取
    │    ├── NettyHttpServerConfig.java        # Netty线程组、ServerBootstrap
    │    ├── HttpFileServerHandler.java        # 自定义Handler, 提供HTTP下载
    │    └── NettyServerRunner.java            # 绑定端口, 启动Netty
    └── ...
src/main/resources
└── application.properties

application.properties 

# ========== Tomcat配置 ==========
server.port=8080

# ========== Netty配置 ==========
netty.http.port=9090
netty.http.baseDir=D:/netty-files
netty.http.bossThreads=1
netty.http.workerThreads=4
  • server.port=8080:Spring Boot 默认 Tomcat 的端口;
  • netty.http.port=9090:Netty 的端口;
  • netty.http.baseDir:文件存放目录(如 D:/netty-files);
  • netty.http.bossThreadsnetty.http.workerThreads:Netty 线程数。

3.3.3 Tomcat 保留

示例:HelloController

package com.example.demo.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 这是一个走Tomcat容器的普通Spring MVC接口
 */
@RestController
public class HelloController {

    @GetMapping("/hello")
    public String hello() {
        return "Hello from Tomcat (default port 8080)!";
    }
}
  • 你可以访问 http://localhost:8080/hello 即可看到此响应。
  • Tomcat 会自动在端口 server.port=8080(若不指定则 8080)运行。

3.3.4 Netty 服务器配置

NettyHttpProperties

先做一个专门的配置类用于读取 Netty 端口、线程数以及文件根目录等信息:

package com.example.demo.netty;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "netty.http")
public class NettyHttpProperties {

    private int port;           // Netty监听端口
    private String baseDir;     // 本地文件存储根目录
    private int bossThreads;    // bossGroup线程数
    private int workerThreads;  // workerGroup线程数

    // Getter/Setter
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getBaseDir() {
        return baseDir;
    }
    public void setBaseDir(String baseDir) {
        this.baseDir = baseDir;
    }
    public int getBossThreads() {
        return bossThreads;
    }
    public void setBossThreads(int bossThreads) {
        this.bossThreads = bossThreads;
    }
    public int getWorkerThreads() {
        return workerThreads;
    }
    public void setWorkerThreads(int workerThreads) {
        this.workerThreads = workerThreads;
    }
}

要点

  • @Configuration + @ConfigurationProperties(prefix = "netty.http") 会让 Spring Boot 自动从 application.properties 中读取 netty.http.* 配置并注入到该类。
  • 这样就能在其他地方 @Autowired NettyHttpProperties 来使用。

NettyHttpServerConfig

创建并配置 ServerBootstrap 等核心 Bean:

package com.example.demo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NettyHttpServerConfig {

    @Bean
    public EventLoopGroup bossGroup(NettyHttpProperties props) {
        return new NioEventLoopGroup(props.getBossThreads());
    }

    @Bean
    public EventLoopGroup workerGroup(NettyHttpProperties props) {
        return new NioEventLoopGroup(props.getWorkerThreads());
    }

    @Bean
    public ServerBootstrap serverBootstrap(EventLoopGroup bossGroup,
                                          EventLoopGroup workerGroup,
                                          NettyHttpProperties props,
                                          HttpFileServerHandler fileHandler) {
        // 创建ServerBootstrap
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<Channel>() {
             @Override
             protected void initChannel(Channel ch) {
                 // 初始化pipeline
                 ch.pipeline().addLast(new HttpServerCodec());          // HTTP编解码
                 ch.pipeline().addLast(new HttpObjectAggregator(65536));// 聚合HTTP消息
                 ch.pipeline().addLast(new HttpContentCompressor());    // 响应压缩(可选)
                 ch.pipeline().addLast(new ChunkedWriteHandler());      // 大数据流写支持
                 ch.pipeline().addLast(fileHandler);                    // 自定义文件处理
             }
         });
        return b;
    }
}

逐段解释

  • EventLoopGroup bossGroupEventLoopGroup workerGroup

    • Netty 典型的双线程组模型:一个专门处理连接(bossGroup),另一个处理读写(workerGroup)。
    • 线程数从 NettyHttpProperties 读出,便于后期调整。
  • ServerBootstrap

    • group(...) 传入这两个线程组;

    • channel(NioServerSocketChannel.class) 表示用 NIO 实现;

    • childHandler(...) 给每个连接的 Channel 初始化 pipeline:

      • HttpServerCodec:将字节流与 HTTP 对象互相转换
      • HttpObjectAggregator(65536):把分段的请求合并成 FullHttpRequest,方便后面处理
      • HttpContentCompressor():可选,对响应进行 gzip 等压缩
      • ChunkedWriteHandler():分块写大数据
      • HttpFileServerHandler:自定义的业务处理(文件下载、请求路由等)。

HttpFileServerHandler

这是一个 Netty Handler,示例展示如何提供文件下载功能。例如,当访问 GET http://localhost:9090/download?filename=xxx 时,把对应的文件返回给客户端。

package com.example.demo.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.FileInputStream;

@Component
@ChannelHandler.Sharable
public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final NettyHttpProperties props;

    public HttpFileServerHandler(NettyHttpProperties props) {
        this.props = props;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // 解析URI和查询参数
        QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
        String path = decoder.path();            // e.g. "/download"
        String filename = null;
        if (decoder.parameters().containsKey("filename")) {
            filename = decoder.parameters().get("filename").get(0);
        }

        // 如果路径不是/download 或没有filename参数, 返回错误
        if (!"/download".equals(path) || filename == null) {
            sendError(ctx, HttpResponseStatus.NOT_FOUND, "Usage: /download?filename=xxx");
            return;
        }

        // 构造文件对象
        File file = new File(props.getBaseDir(), filename);
        if (!file.exists() || !file.isFile()) {
            sendError(ctx, HttpResponseStatus.NOT_FOUND, "File not found: " + filename);
            return;
        }

        // 返回文件数据
        writeFileResponse(ctx, file);
    }

    /**
     * 将文件内容通过HTTP响应写回
     */
    private void writeFileResponse(ChannelHandlerContext ctx, File file) throws Exception {
        // 1) 先写一个响应头
        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);

        long fileLength = file.length();
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
        // 告知客户端以附件形式下载
        response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getName() + "\"");

        ctx.write(response);

        // 2) ChunkedStream 方式写文件内容
        FileInputStream fis = new FileInputStream(file);
        HttpChunkedInput chunkedInput = new HttpChunkedInput(new ChunkedStream(fis));

        ChannelFuture sendFileFuture = ctx.writeAndFlush(chunkedInput);
        sendFileFuture.addListener((ChannelFutureListener) future -> {
            fis.close();
        });
    }

    private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status, String msg) {
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, status,
                Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)
        );
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        // 响应后关闭连接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}

核心逻辑

  • channelRead0

    • 接收到一个完整的 FullHttpRequest
    • 我们解析 uri,只处理 "/download" 路径,且需要一个 filename 查询参数;
    • 如果文件不存在或参数缺失,就返回错误给客户端。
  • writeFileResponse

    • 先写一个 HTTP 响应行与头部,包含 Content-LengthContent-TypeContent-Disposition 等;
    • 使用 ChunkedStream 分块传输文件数据,无需把文件读进内存;
    • 也可以使用 DefaultFileRegion 实现零拷贝,但需考虑操作系统对 sendfile() 的支持,以及禁用 HttpContentCompressor(零拷贝与压缩冲突)。
  • @ChannelHandler.Sharable

    • 表示此 Handler 可以被多个 Channel 共享(前提:无内部状态竞争)。

NettyServerRunner

最后,用 ApplicationRunner 来在 Spring Boot 启动完成后,绑定 Netty 的端口。二者(Tomcat 与 Netty)在同一进程运行,只是端口不同。

package com.example.demo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class NettyServerRunner implements ApplicationRunner {

    @Resource
    private ServerBootstrap serverBootstrap;

    @Resource
    private NettyHttpProperties props;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 绑定Netty端口(与Tomcat不同)
        ChannelFuture future = serverBootstrap.bind(props.getPort()).sync();
        System.out.println("Netty Server started on port " + props.getPort());

        // 阻塞等待关闭事件
        future.channel().closeFuture().sync();
    }
}

解释

  • serverBootstrap.bind(props.getPort()):如 9090
  • sync():阻塞当前线程直到绑定成功;若失败会抛出异常。
  • closeFuture().sync():保证 Netty 不会提前退出,一直运行到收到关闭事件。

3.3.5. 特殊说明

  • 统一域名?

    • 若要在同一域名上,只是不同端口,如 http://yourdomain:8080http://yourdomain:9090
    • 如果想只对外暴露 80/443 等标准端口,可使用 Nginx 反向代理:
      • 路径 /api -> 8080 (Tomcat)
      • 路径 /files -> 9090 (Netty)
    • 这样客户端感知不到两个不同端口。
  • 上传功能

    • 当前 Handler 只演示下载,若需要文件上传(POST /upload),可在 HttpFileServerHandler 中解析 multipart/form-data
    • Netty 官方提供 HttpPostRequestDecoder,或你自己处理二进制流。
  • 零拷贝

    • 对超大文件下载,你可改用 DefaultFileRegion。注意:若要开启零拷贝,就需关闭 HttpContentCompressor(否则压缩与零拷贝冲突)。
    • 同时确认操作系统支持 sendfile
  • 性能调优

    • Tomcat 与 Netty 线程数都可自定义,根据机器 CPU 核数、预计并发量进行调优。
    • 如果 Netty 主要用来跑大文件下载,可适当提高 workerThreads,并使用 EpollEventLoopGroup(Linux下)来进一步提升性能。
  • 会不会造成资源浪费?

    • 在某些场景下,两套容器各自占用线程池与内存。但这是必要的代价,如果你确实需要保留 Tomcat 的生态又想使用 Netty 特性。
  • 安全考虑

    • 如果要 HTTPS,可以分别对 Tomcat 和 Netty 做 SSL 配置(Tomcat SSL Connector、Netty SslHandler)。
    • 如果需要统一认证,可以让两端共享同一个认证服务或 token 验证逻辑。

http://www.kler.cn/a/514460.html

相关文章:

  • linux-ubuntu学习笔记碎记
  • Kubernetes 集群中安装和配置 Kubernetes Dashboard
  • C语言程序设计十大排序—选择排序
  • T-SQL语言的数据库编程
  • linux下一些参数的说明
  • 简述mysql 主从复制原理及其工作过程,配置一主两从并验证
  • mapbox加载geojson,鼠标移入改变颜色,设置样式,vue中使用方法
  • 【docker-1】快速入门docker
  • java 根据前端传回的png图片数组,后端加水印加密码生成pdf,返回给前端
  • ELK介绍
  • 前沿技术趋势洞察:2024年技术的崭新篇章与未来走向!
  • (2)STM32 USB设备开发-USB虚拟串口
  • Android SystemUI——系统快捷设置面板(十三)
  • HTML<form>标签
  • suctf2025
  • Java 中 final 关键字的奥秘
  • 锐捷路由器网关RG-NBR6135-E和锐捷交换机 Ruijie Reyee RG-ES224GC 电脑登录web方法
  • IDEA导入Maven工程不识别pom.xml
  • 5G/4G+北斗三号水利遥测终端机RTU-打造水利工程的智能核心
  • Azure面试
  • PHP语言的语法糖
  • Java 设计模式 二 单例模式 (Singleton Pattern)
  • STM32学习9---EXIT外部中断(理论)
  • qiankun+vite+vue3
  • spring security StackOverflowError 问题解决
  • 腾讯云AI代码助手评测:如何智能高效完成Go语言Web项目开发