当前位置: 首页 > article >正文

Netty解决粘包半包问题

1.定长,每次读取固定的数据量

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(10)); // 每条消息长度固定为10字节
pipeline.addLast(new YourBusinessHandler());

每条消息长度固定,接收端读取固定字节数作为一个完整的消息。

  • 粘包问题: 即使多个消息被合并在一起,定长解码器可以通过固定的长度正确拆分数据。

  • 半包问题: 如果数据不完整,Netty 会等待剩余数据到达再进行组装。

  • 不灵活,只适用于固定长度的协议。

    如果消息长度不一致,需要填充或裁剪数据,浪费存储空间。

2.分隔符

每条消息使用特定的分隔符(如 \n)进行分隔。

粘包问题: 分隔符明确了每条消息的边界,无论消息是否粘连都可以正确拆分。

半包问题: 如果分隔符未到达,Netty 会缓冲当前数据,等待剩余数据到达后组装完整消息。

  • 如果分隔符是消息内容的一部分,可能会导致解析错误。

  • 每条消息都需要附加分隔符,增加了一些开销。

ChannelPipeline pipeline = ch.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes()); // 使用换行符作为分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
pipeline.addLast(new YourBusinessHandler());

3.基于长度字段的解决方案

消息头中包含一个字段表示消息体的长度,接收端根据长度字段解析完整的消息。

粘包问题: 长度字段明确了每条消息的大小,即使多条消息粘连,解码器可以逐条解析。

半包问题: 如果接收的数据不足以包含完整消息,Netty 会缓冲数据,等待剩余部分到达再处理。

  • 适用于变长消息,灵活且高效。

  • 不依赖分隔符,节省了数据的额外开销。

  • 需要协议设计时明确定义长度字段。

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(
    1024, 0, 4, 0, 4)); 
    // 最大帧长度1024,长度字段偏移量为0,长度字段长度为4字节,无附加偏移,去掉长度字段的头部字节
pipeline.addLast(new YourBusinessHandler());

一共五个参数

maxFrameLength:帧的最大长度

lengthFieldOffset:长度的偏移量,用于获取长度,比如你先写入长度4个字节,那就无需偏移,因为是先写入的,所以直接读长度的字节大小可以直接得到。如果先写了header,比如CAFE占用了1个字节,那长度偏移量就得是1了,此时他会跳过从头开始数的第一个字节,然后读取消息的长度。有了消息的长度就可以读取消息了。

lengthFieldLength:长度占用的字节大小,跟上面lengthFieldOffset的来确定发送消息的长度。

initialBytesstrip:剥离字节长度,比如我要剥离掉这个header1字节,还有这个记录长度的字节4字节,我需要指定剥离的字节大小(1字节+4字节),就能只留下消息了。

lengthAdjustment:他指的是从长度之后应该跳过几个字节的内容,比如我在长度和消息之间又加了一个版本号1字节

现在组成:header 1字节 ,长度 4字节 ,版本号 1字节 ,消息("helloworld")

此时maxFrameLength=1024

lengthFieldOffset=1(跳过header)

lengthFieldLength=4(长度的字节大小)

lengthAdjustment=1(长度以后跳过1个字节才是消息)

initialBytesstrip=6 (去掉header、长度、版本号共6个字节)此时才会得到真正的helloworld

4.自定义协议

如果协议复杂或者不符合上述通用解码器的场景,可以手动编写解码器。

通过分析接收到的数据流,根据协议规则解析完整的消息。

粘包问题: 自定义逻辑中明确解析每条消息的边界。

半包问题: 使用 Netty 提供的缓冲区特性,确保接收完整数据后再解析。

@Slf4j
public class ProtocolMessageEncoderDecoder extends ByteToMessageCodec<ProtocolMessage<?>> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage<?> protocolMessage, ByteBuf byteBuf) throws Exception {
        //判断protocolMessage为空
        if(protocolMessage == null || protocolMessage.getHeader() == null){
            byteBuf.writeBytes(new byte[]{});
        }
        //得到请求头
        ProtocolMessage.Header header = protocolMessage.getHeader();

        //拼装请求
        //魔数 1字节
        byteBuf.writeBytes(new byte[]{header.getMagic()});


        //版本号 1字节
        byteBuf.writeBytes(new byte[]{header.getVersion()});
        //序列化器 1字节
        byteBuf.writeBytes(new byte[]{header.getSerializer()});
        //类型 1字节
        byteBuf.writeBytes(new byte[]{header.getType()});
        //状态 1字节
        byteBuf.writeBytes(new byte[]{header.getStatus()});
        //请求id 8字节
        byteBuf.writeLong(header.getRequestId());
        //消息体长度 4字节
//        byteBuf.writeInt(header.getBodyLength());
        // 获取序列化器
        ProtocolMessageSerializerEnum enumByKey = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(enumByKey == null){
            throw new RuntimeException("序列化协议不存在:"+ enumByKey.getValue());
        }
        //利用key得到序列化器
        Serializer serializer = SerializerFactory.getInstance(enumByKey.getValue());
        //序列化请求体
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
        //写入请求体长度
        byteBuf.writeInt(bodyBytes.length);
        //写入请求体
        byteBuf.writeBytes(bodyBytes);
        //完成自定义协议编码
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 分别从指定位置读出 Buffer
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        //魔数
        byte magic = byteBuf.readByte();
        //校验魔数
        if(magic != ProtocolConstant.PROTOCOL_MAGIC){
//            throw new RuntimeException("消息 magic 非法" + magic);
            throw new RpcException(ErrorCode.ConsumerError,"消息 magic 非法" + magic);
        }
        //版本号
        byte version = byteBuf.readByte();
        //序列化器
        byte serializer = byteBuf.readByte();
        //类型
        byte type = byteBuf.readByte();
        //状态
        byte status = byteBuf.readByte();
        //请求id
        long RequestId = byteBuf.readLong();
        //消息体长度
        int BodyLength = byteBuf.readInt();
        //写入header
        header.setMagic(magic);
        header.setSerializer(serializer);
        header.setVersion(version);
        header.setType(type);
        header.setStatus(status);
        header.setRequestId(RequestId);

        header.setBodyLength(BodyLength);
        //获得请求体数据,以上一共17个字节
        byte[] bodyBytes = new byte[BodyLength];

        //写入请求体数据, 解决粘包问题,只读指定长度的数据
        byteBuf.readBytes(bodyBytes,0,BodyLength);
        //解析消息体
        // 获取序列化器
        ProtocolMessageSerializerEnum serializerEnum   = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(serializerEnum  == null){
        throw new RuntimeException("序列化消息的协议不存在");
        }
        //得到序列化器
        Serializer serializer1 = SerializerFactory.getInstance(serializerEnum .getValue());
        ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
        if (messageTypeEnum == null) {
            throw new RuntimeException("序列化消息的类型不存在");
        }
        switch (messageTypeEnum) {
            case REQUEST:
                RpcRequest request  = serializer1.deserialize(bodyBytes, RpcRequest.class);
                list.add(new ProtocolMessage<>(header,request));
                return;
            case RESPONSE:
                RpcResponse response = serializer1.deserialize(bodyBytes, RpcResponse.class);
                list.add(new ProtocolMessage<>(header,response));
                return;
            case HEART_BEAT:return;
            case OTHERS:return;
            default:
                throw new RuntimeException("暂不支持该消息类型");
        }
    }


}

灵活,适用于复杂协议。

方法粘包问题解决半包问题解决使用场景优缺点
定长消息精确分割等待补充数据消息固定长度的协议简单但不灵活,适合定长数据
分隔符根据分隔符拆分等待完整数据使用明确分隔符的协议实现简单,但需要额外的分隔符
长度字段按长度截取等待完整数据消息中包含长度字段的协议灵活高效,适合大多数场景
自定义解码器自行定义逻辑自行定义逻辑协议复杂或不规则灵活性最高,但开发成本高

http://www.kler.cn/a/447938.html

相关文章:

  • vue iframe进行父子页面通信并切换URL
  • 服务器数据恢复—V7000存储中多块磁盘出现故障导致业务中断的数据恢复案例
  • ROS1安装教程
  • 基于Spring Boot的智慧农业专家远程指导系统
  • 练习题 最小栈
  • 用SparkSQL和PySpark完成按时间字段顺序将字符串字段中的值组合在一起分组显示
  • Spring常见问题
  • OpenHarmony-6.IPC/RPC组件
  • 无人机飞防高效率喷洒技术详解
  • 用音乐与自我对话 ——澄迈漓岛音乐节x草台回声
  • Deepin和Windows传文件(Xftp,WinSCP)
  • AI的进阶之路:从机器学习到深度学习的演变(四)
  • 【Android】unzip aar删除冲突classes再zip
  • <QNAP 453D QTS-5.x> 日志记录:Docker 运行的 Flask 应用 SSL 证书 过期, 更新证书
  • 数据结构 C/C++(实验五:图)
  • 【SH】在Ubuntu Server 24中基于Python Web应用的Flask Web开发(实现POST请求)学习笔记
  • 基于Spring Boot的动漫交流与推荐平台
  • Cadence学习笔记 8 添加分页符
  • Vue CLI 脚手架创建项目流程详解 (2)
  • 【git】git命令
  • 《Java源力物语》-2.异常训练场
  • 易语言OCR银行卡文字识别
  • 【Java基础面试题030】Java和Go的区别?
  • EGO Swarm翻译
  • SPL06 基于stm32F103 HAL库驱动(软件模拟IIC)
  • 设计模式之 abstract factory