当前位置: 首页 > article >正文

基于Netty的自定义协议栈设计与编解码技术解析

基于Netty的自定义协议栈设计与编解码技术解析

1. 项目概述

Netty 是一款高性能的网络通信框架,广泛应用于分布式系统、即时通讯、游戏服务器等领域。本项目基于 Netty 框架,设计并实现了一个自定义协议栈,用于构建高效、可靠的网络通信机制。

项目以 协议栈设计 为核心,通过精细化的消息定义、编解码策略以及连接管理,保障通信的完整性、灵活性和高效性。以下是项目的主要功能和实现细节:

1.1 自定义协议设计

本项目实现了一种轻量级的自定义通信协议,由 消息头(Header)消息体(Body) 组成。

  • 消息头字段
    • crcCode:用于校验消息的完整性,确保数据在传输过程中未被篡改。
    • length:消息总长度,包括消息头和消息体,用于接收端解析。
    • type:标识消息类型(如登录请求、心跳包等),便于路由和处理。
    • priority:消息优先级,用于特定场景下的优先级调度。
    • sessionID:会话标识,确保消息的唯一性并支持分布式追踪。
    • 附加字段attachment):支持扩展功能,如携带业务元信息。
  • 消息体字段
    • 可存储任意业务数据,支持通过 Marshalling 技术进行高效的序列化与反序列化。

这种设计不仅简化了消息的处理逻辑,同时也增强了协议的扩展性。

1.2 编解码策略

通过 Netty 提供的 ChannelPipeline,项目实现了 自定义消息编解码器,将网络数据流与业务对象无缝对接:

  • 解码器:解析接收的字节流,提取并组装为协议对象(NettyMessage)。
  • 编码器:将协议对象转换为字节流,便于在网络上传输。

项目中通过 JBoss Marshalling 技术实现消息体的序列化与反序列化,提高了数据处理的效率和灵活性。

1.3 核心功能实现
  1. 连接管理与消息处理
    • 服务端通过 NettyServer 初始化,配置了消息的编解码器以及业务逻辑处理器(如登录认证和心跳响应)。
    • 客户端通过 NettyClient 实现连接与消息交互,并支持断线重连功能。
  2. 登录认证
    • 客户端:在连接建立后发送登录请求(LOGIN_REQ)。
    • 服务端:通过 LoginAuthRespHandler 校验客户端 IP 是否在白名单中,并返回认证结果(LOGIN_RESP)。
    • 安全性:服务端使用白名单校验机制,确保只有合法客户端可以访问。
  3. 心跳机制
    • 客户端:定期发送心跳请求(HEARTBEAT_REQ),保持连接活跃。
    • 服务端:通过 HeartBeatRespHandler 响应心跳请求(HEARTBEAT_RESP)。
    • 功能:有效防止长时间无数据传输导致的连接断开。
  4. 断线重连
    • 客户端在连接断开时,会启动自动重连机制,支持延时重连和递归调用,确保通信的稳定性。
1.4 项目技术点
  1. 高效通信:
    • 使用自定义协议,支持小消息体的高效传输。
    • 消息头字段设计简洁,支持精确路由和灵活扩展。
  2. 可靠性保障:
    • 通过 crcCode 校验机制和长度字段,确保消息完整性。
    • 心跳机制监控连接状态,自动检测异常。
  3. 安全性增强:
    • IP 白名单机制,防止非法客户端访问。
    • 登录认证层实现了身份校验。
  4. 可扩展性:
    • Header 支持扩展的附加字段(attachment),适应不同业务需求。
    • 使用 Marshalling 提高了消息体的序列化灵活性,可替换为 Protobuf、JSON 等其他序列化方案。

2.项目结构概述

在这里插入图片描述

1. 项目目录结构
bash复制编辑com.bfxy.netty
├── common                 # 公共模块(协议定义、消息结构、常量等)
│   ├── MessageType.java   # 消息类型枚举
│   ├── NettyConstant.java # 常量类,定义IP、端口、协议长度等
│   ├── NettyMessage.java  # 自定义消息结构
│   └── Header.java        # 消息头部定义
│
├── codec                  # 编解码模块
│   ├── NettyMessageDecoder.java # 自定义解码器
│   ├── NettyMessageEncoder.java # 自定义编码器
│   └── MarshallingCodecFactory.java # 编解码工具类(可选)
│
├── client                 # 客户端模块
│   ├── NettyClient.java       # 客户端启动类
│   ├── LoginAuthReqHandler.java # 登录认证处理器
│   └── HeartBeatReqHandler.java # 心跳处理器
│
├── server                 # 服务端模块
│   ├── NettyServer.java       # 服务端启动类
│   ├── LoginAuthRespHandler.java # 登录认证处理器
│   └── HeartBeatRespHandler.java # 心跳响应处理器
│
└── handler                # 通用业务处理模块
    └── (可以扩展业务处理逻辑,如具体的业务消息处理器)

2. 模块职责说明
2.1 公共模块(common)

公共模块存储整个项目中通用的定义,包括消息类型、协议常量、消息结构等:

  1. MessageType
    • 定义消息类型的枚举(如 LOGIN_REQLOGIN_RESPHEARTBEAT_REQ 等)。
    • 每种类型通过字节值唯一标识,用于协议识别。
  2. NettyConstant
    • 定义通信相关的常量,如服务端的 IP 地址、端口、消息最大长度等。
    • 提高代码的可维护性和可配置性,避免硬编码。
  3. NettyMessage
    • 定义消息结构,包括消息头(Header)和消息体(body)。
    • 通过 Header 存储协议相关元数据(如消息长度、类型、校验码等),body 存储实际业务数据。
  4. Header
    • 消息头部的定义,与 NettyMessage 组合使用。
    • 包含 crcCode(校验码)、length(消息长度)、type(消息类型)、priority(优先级)等字段。

2.2 编解码模块(codec)
  1. NettyMessageDecoder
    • 自定义解码器,解析接收到的字节流,将其转换为 NettyMessage 对象。
    • 支持协议头的解析和消息体的反序列化。
  2. NettyMessageEncoder
    • 自定义编码器,将 NettyMessage 对象序列化为字节流,便于通过网络传输。
  3. MarshallingCodecFactory
    • 提供 Marshalling 编解码工具的工厂方法,便于创建序列化器和反序列化器。
    • 可替换为其他高效的序列化机制(如 Protobuf、JSON 等)。

2.3 服务端模块(server)

服务端模块管理与客户端的连接,并处理其发送的请求(包括登录认证和心跳机制):

  1. NettyServer
    • 服务端启动类,负责配置 Netty 服务器,绑定端口并监听连接。
    • 通过 ServerBootstrap 设置事件循环组、通道类型及管道处理器(pipeline)。
  2. LoginAuthRespHandler
    • 处理客户端的登录请求(LOGIN_REQ),校验客户端身份。
    • 基于 IP 白名单进行认证,返回登录响应(LOGIN_RESP)。
  3. HeartBeatRespHandler
    • 处理客户端发送的心跳请求(HEARTBEAT_REQ),返回心跳响应(HEARTBEAT_RESP)。
    • 确保服务端与客户端的连接处于活跃状态。

2.4 客户端模块(client)

客户端模块主要负责与服务端建立连接,发送业务请求并接收响应:

  1. NettyClient
    • 客户端启动类,配置 Bootstrap 并连接到服务端。
    • 管理连接超时、自动重连、心跳维持等逻辑。
  2. LoginAuthReqHandler
    • 在客户端启动时发送登录请求(LOGIN_REQ),并处理服务端返回的登录响应(LOGIN_RESP)。
    • 如果登录成功,启动心跳机制;否则关闭连接。
  3. HeartBeatReqHandler
    • 管理客户端的心跳请求。
    • 通过定时任务定期发送心跳消息(HEARTBEAT_REQ),确保连接的活跃性。
2.5 引入依赖

在 Maven 项目中添加以下依赖:

<dependencies>
    
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
    </dependency> 
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
    </dependency>
       <dependency>
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-lang3</artifactId>
       </dependency>       
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.12.Final</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>25.1-jre</version>
    </dependency>
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.7.1</version>
    </dependency>
    <dependency>
       <groupId>org.jboss.marshalling</groupId>
       <artifactId>jboss-marshalling</artifactId>
       <version>1.3.0.CR9</version>
    </dependency>          
    <dependency>
       <groupId>org.jboss.marshalling</groupId>
       <artifactId>jboss-marshalling-serial</artifactId>
       <version>1.3.0.CR9</version>
    </dependency>     
       <dependency>
           <groupId>com.alibaba</groupId>
           <artifactId>fastjson</artifactId>
           <version>1.2.60</version>
       </dependency>       
</dependencies>

3. 客户端与服务端的交互流程
  1. 客户端通过 NettyClient 连接到服务端,并发送登录请求(LOGIN_REQ)。
  2. 服务端通过 LoginAuthRespHandler 校验登录信息,返回响应消息(LOGIN_RESP)。
  3. 客户端接收到登录响应后,启动定时任务,定期发送心跳请求(HEARTBEAT_REQ)。
  4. 服务端通过 HeartBeatRespHandler 接收心跳请求,返回心跳响应(HEARTBEAT_RESP)。
  5. 客户端持续发送心跳请求,保持连接活跃。
  6. 双方可以在此基础上扩展其他业务逻辑,如发送业务请求消息。

最终架构图

rust复制编辑客户端 (NettyClient)
    |
    +--> LoginAuthReqHandler ---> 服务端 (NettyServer)
    |                                |
    +--> HeartBeatReqHandler         +--> LoginAuthRespHandler
                                     |
                                     +--> HeartBeatRespHandler

总结

  • 模块化设计:项目分为公共模块、编解码模块、服务端模块和客户端模块,职责清晰,易于扩展。
  • 协议规范:通过自定义协议(NettyMessage)实现高效的消息传输。
  • 连接管理:通过登录认证和心跳机制,确保连接的安全性和活跃性。
  • 自动重连:客户端具有断线重连能力,提升系统的稳定性。

这种结构设计有助于扩展业务逻辑,同时保持代码的高可读性和可维护性。


2.1.1 协议模块分析

MessageType 类

MessageType 是一个枚举类型,定义了不同类型的消息,主要用于标识消息的种类,并根据这些类型来进行消息的编解码、路由处理等。每个 MessageType 对应一个特定的消息类型,例如服务请求、心跳请求、登录请求等。

枚举类有以下几个关键点:

  1. 消息类型的枚举
    • 每个枚举常量代表一个消息类型,在协议中有明确的定义。
    • 这些枚举值通常是通信协议中的关键部分,决定了消息的处理方式,比如是否需要鉴权、是否需要返回响应等。
  2. byte 类型作为消息标识
    • 使用 byte 类型存储每个消息类型的值(通常为一个字节),可以有效节省内存空间,并且由于网络协议中通常采用字节传输,因此这种表示方式在网络通信中具有更好的兼容性。
  3. value() 方法
    • 该方法返回枚举常量对应的字节值。在编解码过程中,这个值会被用来识别消息类型。
/**
 * 定义消息类型的枚举类,所有的消息类型通过字节值来区分。
 */
public enum MessageType {
    /**
     * 业务请求消息
     */
    SERVICE_REQ((byte) 0),

    /**
     * 业务响应消息
     */
    SERVICE_RESP((byte) 1),

    /**
     * 业务ONE WAY消息,既是请求又是响应
     */
    ONE_WAY((byte) 2),

    /**
     * 握手请求消息
     */
    LOGIN_REQ((byte) 3),

    /**
     * 握手响应消息
     */
    LOGIN_RESP((byte) 4),

    /**
     * 心跳请求消息
     */
    HEARTBEAT_REQ((byte) 5),

    /**
     * 心跳响应消息
     */
    HEARTBEAT_RESP((byte) 6);

    private byte value;

    /**
     * 构造方法
     * 
     * @param value 消息类型的字节值
     */
    private MessageType(byte value) {
        this.value = value;
    }

    /**
     * 获取该枚举常量的字节值
     * 
     * @return 字节值
     */
    public byte value() {
        return this.value;
    }
}
功能分析
  1. 业务消息标识
    • SERVICE_REQSERVICE_RESP 分别代表业务请求和业务响应消息。它们是基于请求-响应模型的消息类型。
    • ONE_WAY 是一种特殊的消息类型,既可以作为请求,也可以作为响应。它在一些特定场景中使用,譬如事件驱动模型或者一些不需要返回数据的请求。
  2. 协议控制消息
    • LOGIN_REQLOGIN_RESP 是用于握手和身份验证的消息类型,客户端和服务端通过这类消息进行连接的初始化、认证等操作。
    • HEARTBEAT_REQHEARTBEAT_RESP 用于维持连接的活跃状态,常用于长时间未发送数据的连接中。
  3. 注释
    • 为了使类更加易于理解,我为每个枚举常量添加了注释,描述了每种消息类型的作用。
NettyConstant

NettyConstant 类是一个常量类,通常用于存放项目中使用的固定常量(如端口号、IP 地址、一些协议常量等),这些常量在整个项目中都需要使用,可以避免硬编码和提高代码的可维护性。

常见的常量类设计有以下几个特点:

  1. 静态常量:常量应该是 static final,表示一旦定义后就不能修改。
  2. 私有构造方法:为了防止实例化常量类,通常会将构造方法设置为 private
  3. 规范命名:常量的命名要符合大写字母加下划线分隔的命名规则。
优化后的 NettyConstant
/**
 * 存放与网络通信相关的常量类,如 IP 地址、端口等。
 */
public final class NettyConstant {

    /**
     * 服务端远程IP地址
     */
    public static final String REMOTEIP = "127.0.0.1";

    /**
     * 服务端端口号
     */
    public static final int PORT = 8080;

    /**
     * 消息长度的最大限制
     */
    public static final int MAX_MESSAGE_LENGTH = 1024 * 1024;  // 1MB

    /**
     * 协议头部长度,通常是固定的
     */
    public static final int HEADER_LENGTH = 4 + 4 + 1 + 1; // CRC + Length + Type + Priority

    /**
     * 消息类型与心跳检测间隔
     */
    public static final long HEARTBEAT_INTERVAL = 30L;  // 30秒

    /**
     * 私有构造方法,防止实例化
     */
    private NettyConstant() {
        throw new UnsupportedOperationException("This is a constants class and cannot be instantiated");
    }
}
功能分析
  1. REMOTEIPPORT
    • 这两个常量定义了服务端的远程 IP 地址和端口号,所有客户端和服务端的通信都必须遵循这一配置,确保它们之间可以顺利建立连接。
  2. MAX_MESSAGE_LENGTH
    • 限制消息的最大长度,防止一次性发送过大的数据,造成内存溢出或性能问题。
  3. HEADER_LENGTH
    • 头部长度通常是协议的固定部分。这里假设头部包含了 CRC 校验、消息长度、消息类型和优先级等信息,计算出总长度。
  4. HEARTBEAT_INTERVAL
    • 设置心跳的间隔时间,这有助于维护连接的活跃状态,避免超时或连接断开。
  5. 构造方法
    • 将构造方法私有化,防止在其他地方创建 NettyConstant 类的实例,确保常量类的单一实例性和良好的设计。
总结
  • MessageType 枚举类 定义了协议中所有的消息类型,每种消息类型使用一个字节进行标识,便于后续的消息处理和路由。
  • NettyConstant 常量类 存储了所有与协议相关的常量,确保了在项目中不出现硬编码的情况,增强了代码的可维护性和可配置性。

2.1.1 协议模块分析

struct 通常是用来表示消息的固定结构体,其中包含协议中的各个字段,如消息头部、长度、类型、优先级等。这部分代码主要定义了消息结构,以及如何在字节流和 Java 对象之间进行转换。

假设您项目中的 struct 相关代码位于 com.bfxy.netty.struct 包下(这是常见的做法),以下是相关代码的具体分析:

3.1 NettyMessage

NettyMessage 类通常是消息的主体,承载了协议的各个字段,包括协议头和消息体。它的定义一般类似于下面的结构:

public class NettyMessage {
    private int crcCode;     // CRC 校验码,用于校验数据完整性
    private int length;      // 消息长度
    private byte type;       // 消息类型
    private byte priority;   // 消息优先级
    private Object body;     // 消息体,包含实际的数据内容

    // 构造方法
    public NettyMessage(int crcCode, int length, byte type, byte priority, Object body) {
        this.crcCode = crcCode;
        this.length = length;
        this.type = type;
        this.priority = priority;
        this.body = body;
    }

    // 各个属性的getter和setter方法
    public int getCrcCode() {
        return crcCode;
    }

    public void setCrcCode(int crcCode) {
        this.crcCode = crcCode;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public byte getPriority() {
        return priority;
    }

    public void setPriority(byte priority) {
        this.priority = priority;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }
}
解读:
  • CRC 校验码 (crcCode):用于检测数据在传输过程中是否发生了变化。校验值通常基于消息内容进行计算,确保消息的完整性和可靠性。
  • 消息长度 (length):表示消息的总长度,包括消息头部和消息体的字节数。该字段确保接收方能够正确地读取完整的消息。
  • 消息类型 (type):标识消息的类型。例如,是否是登录请求、心跳请求等。这个字段决定了接收方如何处理不同的消息。
  • 优先级 (priority):用于指定消息的优先级,可以影响消息处理的顺序。
  • 消息体 (body):消息的实际内容,通常是一个 Java 对象,可能包含业务数据。这个字段的数据类型是 Object,因此可以存放任意 Java 对象。
3.2 NettyMessage 与协议头

消息的结构通常包括两个部分:

  • 消息头部:包括 crcCodelengthtypepriority 等字段,这些字段是协议的基础元数据。
  • 消息体:实际的业务数据。通过序列化(Marshalling)进行处理。

对于解码和编码来说,最常见的操作是通过 ByteBuf 读取和写入这些字段:

// 读取消息头
int crc = in.readInt();     // 读取CRC校验码
int length = in.readInt();  // 读取消息长度
byte type = in.readByte();  // 读取消息类型
byte priority = in.readByte(); // 读取消息优先级

// 读取消息体
Object body = marshallingDecoder.decode(in);

通过这样的方式,可以将接收到的字节流解析为 NettyMessage 对象。反过来,在编码时,将 NettyMessage 的各个字段序列化为字节流。

3.3 struct 包的角色与作用

struct 在协议栈中的主要作用是定义和处理消息的结构体。每个协议都有自己的结构,这个结构在项目中有以下重要作用:

  • 消息格式的标准化struct 确保每个消息的结构一致,使得消息传输和处理成为可能。
  • 与字节流的转换:通过 ByteBuf 或其他流的方式,将字节流与消息对象(如 NettyMessage)之间进行互相转换。
  • 校验机制:通过 crcCode 等字段实现对消息的完整性校验。

4. Marshalling 编码器解析与 Netty 的集成

在网络通信中,数据的高效序列化和反序列化是实现系统高性能的关键。Marshalling 编解码技术在本项目中用于将 Java 对象转换为字节流,以及将字节流重新转换为 Java 对象。这一技术在数据传输过程中,尤其是结合 Netty 框架时,显得尤为重要。

4.1 Marshalling 编码器的实现

Marshalling 编解码器通过使用 JBoss 提供的序列化机制,将 Java 对象与字节流进行互转。具体来说,我们在 MarshallingCodecFactory 中实现了编解码器的工厂方法:

public class MarshallingCodecFactory {

    // 创建 Marshalling 解码器
    public static MarshallingDecoder buildMarshallingDecoder() throws Exception {
        // 创建一个 JBOSS Marshaller 对象
        MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        Marshaller marshaller = marshallerFactory.createMarshaller(new MarshallingConfiguration());

        // 返回解码器
        return new MarshallingDecoder(marshaller);
    }

    // 创建 Marshalling 编码器
    public static MarshallingEncoder buildMarshallingEncoder() throws Exception {
        // 创建一个 JBOSS Unmarshaller 对象
        UnmarshallerFactory unmarshallerFactory = Marshalling.getProvidedUnmarshallerFactory("serial");
        Unmarshaller unmarshaller = unmarshallerFactory.createUnmarshaller(new MarshallingConfiguration());

        // 返回编码器
        return new MarshallingEncoder(unmarshaller);
    }
}

通过这些工厂方法,我们实现了编码器与解码器的创建。MarshallingDecoderMarshallingEncoder 分别负责对象的反序列化和序列化任务。

4.2 编码与解码流程

在 Netty 中,数据的编码和解码是通过 ChannelHandler 完成的。MarshallingDecoderMarshallingEncoder 分别负责将字节流转换为 Java 对象,或者将 Java 对象转换为字节流。

  1. 解码过程
    • MarshallingDecoder 通过反序列化操作,将字节流转换为 Java 对象。在 NettyMessageDecoder 中,MarshallingDecoder 用来解析字节流中的消息体部分,并将其转换为 Java 对象(通常是 NettyMessagebody 字段)。
    • 解码流程的核心在于 ChannelBufferByteInput 类,它将 Netty 中的 ByteBuf 转换为 ByteInput,然后交给 Marshalling 的解码器进行处理。
  2. 编码过程
    • MarshallingEncoder 负责将 Java 对象(如 NettyMessagebody)序列化为字节流。在 NettyMessageEncoder 中,MarshallingEncoder 被用来将消息体的 body 字段序列化为字节流,并将其写入到 ByteBuf 中。
    • 序列化后,Netty 会通过 setInt 方法更新消息的长度字段,确保发送的字节流包含正确的长度信息。
4.3 Marshalling 与 Netty 的结合优势

Marshalling 编解码器与 Netty 集成,可以带来以下几个显著的优势:

  • 高效的对象序列化
    Marshalling 提供了一种高效的对象序列化机制,特别适用于复杂对象的序列化操作。在网络传输中,尽量减少数据的序列化开销是提升性能的关键。
  • 跨平台支持
    Marshalling 支持不同平台之间的数据交换,能够有效处理各种数据类型的转换问题。尤其是在分布式系统中,确保数据格式的一致性是至关重要的。
  • 与 Netty 的无缝集成
    Marshalling 与 Netty 的集成方式极其简便,利用 ChannelBufferByteInputChannelBufferByteOutput 类将 ByteBuf 转换为 ByteInputByteOutput,使得 Marshalling 可以无缝地与 Netty 配合使用。
4.4 数据包解析与消息处理

在实际应用中,数据包通常包含头部信息和消息体部分。NettyMessageDecoder 类实现了基于长度域的帧解码方式,它通过 LengthFieldBasedFrameDecoder 对消息进行拆解。具体来说,它从消息流中提取出消息头,校验字段(如 crcCode)、消息长度、会话 ID 等信息,并将消息体部分交给 MarshallingDecoder 进行解码。

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    MarshallingDecoder marshallingDecoder;

    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        marshallingDecoder = new MarshallingDecoder();
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }

        NettyMessage message = new NettyMessage();
        Header header = new Header();
        // 解码头部信息
        header.setCrcCode(frame.readInt());
        header.setLength(frame.readInt());
        header.setSessionID(frame.readLong());
        header.setType(frame.readByte());
        header.setPriority(frame.readByte());
        
        int size = frame.readInt();
        // 解码附件
        if (size > 0) {
            Map<String, Object> attachments = new HashMap<>(size);
            for (int i = 0; i < size; i++) {
                byte[] keyBytes = new byte[frame.readInt()];
                frame.readBytes(keyBytes);
                String key = new String(keyBytes, "UTF-8");
                attachments.put(key, marshallingDecoder.decode(frame));
            }
            header.setAttachment(attachments);
        }

        // 解码消息体
        if (frame.readableBytes() > 4) {
            message.setBody(marshallingDecoder.decode(frame));
        }

        message.setHeader(header);
        return message;
    }
}

在这里,消息的头部和体部分是通过不同的方式解码的。首先,解码头部字段,然后使用 MarshallingDecoder 解码消息体。


**2.3 **服务端模块代码分析

服务端模块是基于Netty框架搭建的,它主要承担以下几个功能:

  1. 连接管理:服务端负责接受客户端连接并分配处理线程。
  2. 消息解码与编码:通过解码器和编码器对网络传输的数据进行转换。
  3. 登录认证:通过 LoginAuthRespHandler 处理客户端的登录请求。
  4. 心跳机制:通过 HeartBeatRespHandler 处理客户端的心跳包。

1. NettyServer

NettyServer 是整个服务端应用的启动类,负责设置和启动Netty服务器。它的主要任务是创建两个 EventLoopGroup,分别处理客户端连接和请求。ServerBootstrap 是Netty中用于服务端的启动类,它配置了所有的管道处理器(handlers),包括消息解码、编码、以及业务处理。

代码分析:
public class NettyServer {

    public void bind() throws Exception {
        // 创建两个 EventLoopGroup,用于处理连接和请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();  // 处理客户端连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理客户端请求

        // 创建 ServerBootstrap,设置服务端的启动配置
        ServerBootstrap b = new ServerBootstrap();
        
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)  // 使用 NIO 模式的服务端套接字
         .option(ChannelOption.SO_BACKLOG, 100)  // 设置 TCP 参数,连接请求的等待队列长度
         .handler(new LoggingHandler(LogLevel.INFO))  // 设置日志处理器,记录服务端日志
         .childHandler(new ChannelInitializer<SocketChannel>() { // 子处理器
             @Override
             public void initChannel(SocketChannel ch) throws IOException {
                 // 添加消息解码器和编码器
                 ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
                 ch.pipeline().addLast(new NettyMessageEncoder());
                 
                 // 添加业务逻辑处理器
                 ch.pipeline().addLast(new LoginAuthRespHandler());
                 ch.pipeline().addLast(new HeartBeatRespHandler());
             }
         });

        // 绑定端口并启动服务
        ChannelFuture cf = b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
        
        // 等待服务器通道关闭
        cf.channel().closeFuture().sync();
    }
}
关键点解析:
  • EventLoopGroup:
    • bossGroup:管理客户端的连接请求。
    • workerGroup:负责处理具体的请求(IO操作和数据处理)。
  • ServerBootstrap
    • channel(NioServerSocketChannel.class):指定使用 NIO 的服务端套接字。
    • .option(ChannelOption.SO_BACKLOG, 100):设置连接请求的排队长度。
    • childHandler:设置连接后数据的处理逻辑,主要是解码、编码以及业务处理。
  • ChannelInitializer
    • 通过 initChannel 方法配置了数据的解码、编码以及业务处理的Handler。

2. LoginAuthRespHandler

LoginAuthRespHandler 主要负责客户端的登录请求处理。当客户端发起登录请求时,服务端通过该类验证客户端的身份。如果验证成功,返回登录成功的响应;否则,返回登录失败并关闭连接。

代码分析:
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {

    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
    private String[] whitekList = { "127.0.0.1", "192.168.1.200" };

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 检查是否是登录请求
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_REQ.value()) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage loginResp = null;
            if (nodeCheck.containsKey(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
            } else {
                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean isOK = false;
                for (String WIP : whitekList) {
                    if (WIP.equals(ip)) {
                        isOK = true;
                        break;
                    }
                }
                loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1);
                if (isOK)
                    nodeCheck.put(nodeIndex, true);
            }
            System.out.println("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]");
            ctx.writeAndFlush(loginResp);
        } else {
            ctx.fireChannelRead(msg); // 不是登录请求则继续传递消息
        }
    }

    private NettyMessage buildResponse(byte result) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_RESP.value());
        message.setHeader(header);
        message.setBody(result);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        nodeCheck.remove(ctx.channel().remoteAddress().toString());// 删除缓存
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
}
代码解析:
  • channelRead
    • 在处理登录请求时,首先检查消息的类型是否是 LOGIN_REQ
    • 然后,检查客户端的IP是否在白名单中。如果在白名单中,则返回登录成功的响应,否则返回登录失败,并关闭连接。
  • buildResponse
    • 根据登录验证结果构建 NettyMessage 响应消息。
登录认证的作用:
  • 身份验证:确保只有合法的客户端才能连接到服务端。
  • 控制连接:通过白名单(whitekList)控制哪些客户端允许连接。

3. HeartBeatRespHandler

HeartBeatRespHandler 主要用于处理客户端发送的心跳请求。服务端通过该类处理来自客户端的心跳包,并及时返回响应,保持连接的活跃。

代码分析:
public class HeartBeatRespHandler extends ChannelDuplexHandler {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
            System.out.println("Receive com.bfxy.netty.client heart beat message : ---> "
                    + message);
            NettyMessage heartBeat = buildHeatBeat();
            System.out.println("Send heart beat response message to com.bfxy.netty.client : ---> "
                    + heartBeat);
            ctx.writeAndFlush(heartBeat);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private NettyMessage buildHeatBeat() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }
}
代码解析:
  • channelRead
    • 检查接收到的消息类型是否为心跳请求 HEARTBEAT_REQ
    • 如果是心跳包,则构造心跳响应并返回给客户端。

总结:

  • NettyServer:负责初始化服务端,设置管道处理器,并启动Netty服务监听指定端口。
  • LoginAuthRespHandler:处理客户端的登录请求,验证其身份并返回相应结果。
  • HeartBeatRespHandler:处理客户端的心跳请求,保持连接活跃。

通过这种方式,服务端能够有效地管理连接、处理认证和心跳机制,确保与客户端的通信稳定高效。


2.4 客户端模块分析

客户端模块负责与服务端进行通信,发送请求消息(如登录请求、业务请求等),并接收服务端的响应消息。客户端需要配置连接、消息编解码、业务逻辑处理以及心跳机制等。以下是对 com.bfxy.netty.client 包结构的详细分析,特别是 NettyClientLoginAuthReqHandlerHeartBeatReqHandler 类。

客户端模块涉及的主要类包括 NettyClient 启动类、登录认证处理器(LoginAuthReqHandler)和心跳处理器(HeartBeatReqHandler)。它们共同完成了客户端的初始化、连接管理、消息发送和响应接收等任务。

代码分析

NettyClient 类实现了以下功能:

  1. 客户端连接服务端:它通过 Bootstrap 启动客户端,并连接到服务端指定的 IP 和端口。
  2. 心跳机制和重连:客户端如果与服务端的连接断开,它会通过一个定时任务进行自动重连。
  3. 消息编解码:配置了消息的编解码器(NettyMessageDecoderNettyMessageEncoder)。
  4. 超时处理:添加了读取超时处理器(ReadTimeoutHandler),防止因连接超时导致的阻塞。
  5. 消息处理器:通过 LoginAuthReqHandlerHeartBeatReqHandler 处理登录请求和心跳请求。

详细代码解析

public class NettyClient {

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {
        try {
            // 创建 Netty 客户端启动类
            Bootstrap b = new Bootstrap();
            b.group(group) // 设置 EventLoopGroup 处理客户端事件
             .channel(NioSocketChannel.class) // 使用 NIO 模式的客户端套接字
             .option(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle 算法,确保小数据包能立即发送
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 配置客户端的 ChannelPipeline
                    ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); // 消息解码器
                    ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder()); // 消息编码器
                    ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); // 读取超时处理器
                    ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler()); // 登录认证处理器
                    ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler()); // 心跳请求处理器
                }
            });

            // 连接到服务端
            ChannelFuture future = b.connect(
                new InetSocketAddress(host, port), // 连接远程服务器
                new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT) // 本地绑定地址
            ).sync(); // 阻塞直到连接完成

            System.out.println("Client Start.. ");
            
            // 等待客户端关闭
            future.channel().closeFuture().sync();
        } finally {
            // 如果连接关闭或出现异常,执行重连操作
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1); // 延时重连
                        try {
                            connect(NettyConstant.PORT, NettyConstant.REMOTEIP); // 递归调用连接方法实现重连
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        // 启动客户端并连接到服务端
        new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
    }
}

核心点解析

  1. Bootstrap 配置
    • group(group):指定事件循环组,用于处理 I/O 事件。这里使用了 NioEventLoopGroup,支持 NIO(非阻塞模式)。
    • .channel(NioSocketChannel.class):选择使用 NIO 模式的套接字通道。
    • .option(ChannelOption.TCP_NODELAY, true):禁用 Nagle 算法,这对于延迟敏感的应用程序(如实时通信)来说是一个常见的优化。
    • .handler(new ChannelInitializer<SocketChannel>() {...}):在管道中加入了多个处理器,包括消息编解码器、超时处理器、登录认证处理器和心跳请求处理器。
  2. 超时处理
    • ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));:如果客户端在 50 秒内没有读取到任何数据,ReadTimeoutHandler 会触发连接关闭。这是为了防止连接挂起或者长时间不活跃。
  3. 登录和心跳机制
    • LoginAuthReqHandler:用于处理登录认证请求,通常会在客户端启动时发起登录请求。
    • HeartBeatReqHandler:用于处理心跳请求,确保客户端和服务端之间的连接保持活跃。
  4. 重连机制
    • 你的代码中通过 ScheduledExecutorService 来定时执行重连操作。当连接关闭或出现异常时,它会延迟一秒钟后重新调用 connect 方法,尝试重新连接到服务端。这确保了客户端在断开连接时能够自动恢复。
  5. 递归调用
    • 如果连接断开,connect 方法会递归调用自身进行重连。你使用了 TimeUnit.SECONDS.sleep(1) 来延迟重连,这可以防止过于频繁的重连请求。

可能需要改进的地方

  • 重连策略优化:目前的重连策略是简单的递归调用。你可以考虑引入指数退避(Exponential Backoff)策略,逐步增加重连的时间间隔,以避免在网络问题持续时频繁重试。

    示例(指数退避):

    private int retryCount = 0;
    
    private void reconnect() {
        executor.execute(() -> {
            try {
                int delay = (int) Math.pow(2, retryCount); // 指数退避
                TimeUnit.SECONDS.sleep(delay);
                retryCount++;
                connect(NettyConstant.PORT, NettyConstant.REMOTEIP); // 尝试重新连接
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    
  • 资源清理:虽然在 finally 块中启动了重连线程,但 EventLoopGroup 并没有在程序结束时关闭。在 main 方法或者其他适当的地方,你应该确保 group.shutdownGracefully() 被调用,以释放 Netty 的资源。

    示例:

    group.shutdownGracefully();
    

总结

NettyClient 类实现了一个比较标准的客户端逻辑,能够连接到服务端、处理消息的编解码、认证和心跳机制,并具备基本的自动重连功能。你在连接断开时通过递归重连和定时任务的方式来确保客户端能够在断线后重新连接上服务端。

通过适当的重连优化和资源管理改进,可以进一步提升代码的稳定性和性能。

1.2 客户端连接和请求发送流程概述:
  1. 客户端启动时,创建一个 Bootstrap,配置 EventLoopGroup、通道类型(NioSocketChannel)等。
  2. 配置消息编解码器(NettyMessageDecoderNettyMessageEncoder)和业务逻辑处理器(如 LoginAuthReqHandler)。
  3. 客户端连接到服务端并发起连接请求。
  4. 如果连接成功,客户端可以发送业务请求或进行身份认证(登录)。
  5. 在收到响应消息后,客户端会根据消息类型进行相应的处理,如登录成功、心跳响应等。

2. LoginAuthReqHandler

LoginAuthReqHandler 负责客户端的登录请求处理。它构建并发送 LOGIN_REQ 消息,告知服务端进行身份认证。服务端在收到请求后会返回 LOGIN_RESP 消息,客户端根据该响应判断认证是否成功。以下是代码示例:

public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 客户端在连接成功后发送登录请求
        NettyMessage loginReq = buildLoginRequest();
        ctx.writeAndFlush(loginReq);
    }

    // 构建登录请求消息
    private NettyMessage buildLoginRequest() {
        NettyMessage request = new NettyMessage();
        request.setType(MessageType.LOGIN_REQ.value());
        request.setBody("Client Login Request");
        return request;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 判断服务端是否返回登录响应
        if (message.getType() == MessageType.LOGIN_RESP.value()) {
            String response = (String) message.getBody();
            System.out.println("Login Response: " + response);

            // 如果登录成功,启动心跳机制
            if (response.equals("Login Successful")) {
                // 启动心跳机制
                startHeartBeat(ctx);
            } else {
                // 登录失败,关闭连接
                ctx.close();
            }
        } else {
            super.channelRead(ctx, msg); // 不是登录响应则继续传递消息
        }
    }

    // 启动心跳请求
    private void startHeartBeat(ChannelHandlerContext ctx) {
        NettyMessage heartBeatReq = buildHeartBeatRequest();
        ctx.writeAndFlush(heartBeatReq);
    }

    // 构建心跳请求消息
    private NettyMessage buildHeartBeatRequest() {
        NettyMessage heartBeatReq = new NettyMessage();
        heartBeatReq.setType(MessageType.HEARTBEAT_REQ.value());
        return heartBeatReq;
    }
}
2.1 代码解析:
  • channelActive:
    • 当客户端与服务端建立连接时,客户端主动发送登录请求消息(LOGIN_REQ)到服务端。
  • buildLoginRequest:
    • 构建登录请求消息,设置消息类型为 LOGIN_REQ,并附带消息体(如登录请求的描述)。
  • channelRead:
    • 客户端收到服务端的响应消息(LOGIN_RESP)后,根据响应内容判断登录是否成功。
    • 如果登录成功,则启动心跳机制;如果登录失败,则关闭连接。
  • startHeartBeatbuildHeartBeatRequest:
    • 登录成功后,客户端发送心跳请求(HEARTBEAT_REQ),保持与服务端的连接活跃。
2.2 登录请求和心跳机制的作用:
  • 登录请求:确保客户端能够与服务端建立认证的通信链路,只有合法客户端才能访问服务端资源。
  • 心跳机制:客户端通过定期发送心跳请求,保持与服务端的连接活跃。如果心跳丢失,服务端可以断开不活跃的连接。

3. HeartBeatReqHandler 类分析

HeartBeatReqHandler 类继承自 ChannelInboundHandlerAdapter,用于管理客户端的心跳请求。它的主要功能是处理与心跳相关的消息,包括发送心跳请求和处理心跳响应。具体代码如下:

public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {

    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        
        // 登录成功后启动心跳任务
        if (message.getHeader() != null 
            && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
            this.heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
        } 
        // 收到心跳响应,打印日志
        else if (message.getHeader() != null 
            && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
            System.out.println("Client received heart beat response: ---> " + message);
        } 
        else {
            ctx.fireChannelRead(msg); // 传递其他类型的消息
        }
    }

    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;

        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            NettyMessage heartBeat = buildHeartBeat();
            System.out.println("Client sending heart beat message: ---> " + heartBeat);
            ctx.writeAndFlush(heartBeat);
        }

        private NettyMessage buildHeartBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }
}
3.1 代码解析:
  • channelRead:
    • 客户端接收到服务端的响应消息时,检查是否为登录响应或心跳响应。
    • 如果是登录响应,启动定时任务定期发送心跳请求。
    • 如果是心跳响应,客户端输出日志,确认收到心跳响应。
  • HeartBeatTask:
    • 定期发送心跳请求(HEARTBEAT_REQ),确保与服务端的连接处于活跃状态。

4. 总结

  1. NettyClient:
    • 负责客户端的启动和连接配置,管理与服务端的连接。
  2. LoginAuthReqHandler:
    • 处理客户端的登录请求和响应,完成身份认证,并在成功后启动心跳机制。
  3. HeartBeatReqHandler:
    • 管理心跳请求和响应,确保客户端与服务端保持活跃连接。

客户端模块的核心职责是与服务端建立连接并保持通信活跃,通过身份认证和心跳机制确保通信的可靠性和持续性。


4. 客户端模块总结

  • NettyClient:负责客户端的连接、请求发送和响应接收。
  • LoginAuthReqHandler:处理客户端的登录请求和认证,确保连接安全。
  • HeartBeatReqHandler:发送心跳请求,确保与服务端的连接持续活跃。

客户端模块通过这几个主要的类,实现了客户端与服务端的认证、心跳机制、请求响应等核心功能。

5.运行结果

在这里插入图片描述

在这里插入图片描述

总结

项目的结构清晰地分为几个关键模块,分别负责协议定义、消息编解码、服务端管理、工具类和测试。每个模块通过职责划分,使得项目具有良好的扩展性和可维护性。希望这次详细分析对您理解项目结构有所帮助!


http://www.kler.cn/a/518498.html

相关文章:

  • flink写parquet解决timestamp时间格式字段问题
  • 每日 Java 面试题分享【第 13 天】
  • 狗狗能吃萝卜吗?
  • 网络安全 | F5-Attack Signatures详解
  • 【C++高并发服务器WebServer】-7:共享内存
  • 奇怪的单词(快速扩张200个单词)
  • 基于Flask的天猫美妆销售数据分析系统的设计与实现
  • PortSwigger靶场练习---跨站点请求伪造:CSRF vulnerability with no defenses没有防御措施的 CSRF 漏洞
  • 导出地图为pdf文件
  • [极客大挑战 2019]Upload1
  • 假期学习【Java程序】的实施方案
  • C#标准Mes接口框架(持续更新)
  • 三分钟简单了解一些HTML的标签和语法_02
  • 技术总结:FPGA基于GTX+RIFFA架构实现多功能SDI视频转PCIE采集卡设计方案
  • Linux 命令行网络连接指南
  • AIGC的企业级解决方案架构及成本效益分析
  • RocketMQ 的 Topic 和消息队列MessageQueue信息,是怎么分布到Broker的?怎么负载均衡到Broker的?
  • 数据结构——二叉树——堆(1)
  • 【后端开发】字节跳动青训营之性能分析工具pprof
  • 正则表达式以及Qt中的使用
  • 为什么UI导入png图会出现白边
  • Zbrush导入笔刷
  • Android中Service在新进程中的启动流程
  • “AI视觉贴装系统:智能贴装,精准无忧
  • 《论文翻译》KIMI K1.5:用大语言模型扩展强化学习
  • 保存复合型数据到h5文件