Netty解决粘包半包问题
1.定长,每次读取固定的数据量
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(10)); // 每条消息长度固定为10字节
pipeline.addLast(new YourBusinessHandler());
每条消息长度固定,接收端读取固定字节数作为一个完整的消息。
-
粘包问题: 即使多个消息被合并在一起,定长解码器可以通过固定的长度正确拆分数据。
-
半包问题: 如果数据不完整,Netty 会等待剩余数据到达再进行组装。
-
不灵活,只适用于固定长度的协议。
如果消息长度不一致,需要填充或裁剪数据,浪费存储空间。
2.分隔符
每条消息使用特定的分隔符(如 \n
)进行分隔。
粘包问题: 分隔符明确了每条消息的边界,无论消息是否粘连都可以正确拆分。
半包问题: 如果分隔符未到达,Netty 会缓冲当前数据,等待剩余数据到达后组装完整消息。
-
如果分隔符是消息内容的一部分,可能会导致解析错误。
-
每条消息都需要附加分隔符,增加了一些开销。
ChannelPipeline pipeline = ch.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes()); // 使用换行符作为分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
pipeline.addLast(new YourBusinessHandler());
3.基于长度字段的解决方案
消息头中包含一个字段表示消息体的长度,接收端根据长度字段解析完整的消息。
粘包问题: 长度字段明确了每条消息的大小,即使多条消息粘连,解码器可以逐条解析。
半包问题: 如果接收的数据不足以包含完整消息,Netty 会缓冲数据,等待剩余部分到达再处理。
-
适用于变长消息,灵活且高效。
-
不依赖分隔符,节省了数据的额外开销。
-
需要协议设计时明确定义长度字段。
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024, 0, 4, 0, 4));
// 最大帧长度1024,长度字段偏移量为0,长度字段长度为4字节,无附加偏移,去掉长度字段的头部字节
pipeline.addLast(new YourBusinessHandler());
一共五个参数
maxFrameLength:帧的最大长度
lengthFieldOffset:长度的偏移量,用于获取长度,比如你先写入长度4个字节,那就无需偏移,因为是先写入的,所以直接读长度的字节大小可以直接得到。如果先写了header,比如CAFE占用了1个字节,那长度偏移量就得是1了,此时他会跳过从头开始数的第一个字节,然后读取消息的长度。有了消息的长度就可以读取消息了。
lengthFieldLength:长度占用的字节大小,跟上面lengthFieldOffset的来确定发送消息的长度。
initialBytesstrip:剥离字节长度,比如我要剥离掉这个header1字节,还有这个记录长度的字节4字节,我需要指定剥离的字节大小(1字节+4字节),就能只留下消息了。
lengthAdjustment:他指的是从长度之后应该跳过几个字节的内容,比如我在长度和消息之间又加了一个版本号1字节
现在组成:header 1字节 ,长度 4字节 ,版本号 1字节 ,消息("helloworld")
此时maxFrameLength=1024
lengthFieldOffset=1(跳过header)
lengthFieldLength=4(长度的字节大小)
lengthAdjustment=1(长度以后跳过1个字节才是消息)
initialBytesstrip=6 (去掉header、长度、版本号共6个字节)此时才会得到真正的helloworld
4.自定义协议
如果协议复杂或者不符合上述通用解码器的场景,可以手动编写解码器。
通过分析接收到的数据流,根据协议规则解析完整的消息。
粘包问题: 自定义逻辑中明确解析每条消息的边界。
半包问题: 使用 Netty 提供的缓冲区特性,确保接收完整数据后再解析。
@Slf4j
public class ProtocolMessageEncoderDecoder extends ByteToMessageCodec<ProtocolMessage<?>> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage<?> protocolMessage, ByteBuf byteBuf) throws Exception {
//判断protocolMessage为空
if(protocolMessage == null || protocolMessage.getHeader() == null){
byteBuf.writeBytes(new byte[]{});
}
//得到请求头
ProtocolMessage.Header header = protocolMessage.getHeader();
//拼装请求
//魔数 1字节
byteBuf.writeBytes(new byte[]{header.getMagic()});
//版本号 1字节
byteBuf.writeBytes(new byte[]{header.getVersion()});
//序列化器 1字节
byteBuf.writeBytes(new byte[]{header.getSerializer()});
//类型 1字节
byteBuf.writeBytes(new byte[]{header.getType()});
//状态 1字节
byteBuf.writeBytes(new byte[]{header.getStatus()});
//请求id 8字节
byteBuf.writeLong(header.getRequestId());
//消息体长度 4字节
// byteBuf.writeInt(header.getBodyLength());
// 获取序列化器
ProtocolMessageSerializerEnum enumByKey = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if(enumByKey == null){
throw new RuntimeException("序列化协议不存在:"+ enumByKey.getValue());
}
//利用key得到序列化器
Serializer serializer = SerializerFactory.getInstance(enumByKey.getValue());
//序列化请求体
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
//写入请求体长度
byteBuf.writeInt(bodyBytes.length);
//写入请求体
byteBuf.writeBytes(bodyBytes);
//完成自定义协议编码
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 分别从指定位置读出 Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
//魔数
byte magic = byteBuf.readByte();
//校验魔数
if(magic != ProtocolConstant.PROTOCOL_MAGIC){
// throw new RuntimeException("消息 magic 非法" + magic);
throw new RpcException(ErrorCode.ConsumerError,"消息 magic 非法" + magic);
}
//版本号
byte version = byteBuf.readByte();
//序列化器
byte serializer = byteBuf.readByte();
//类型
byte type = byteBuf.readByte();
//状态
byte status = byteBuf.readByte();
//请求id
long RequestId = byteBuf.readLong();
//消息体长度
int BodyLength = byteBuf.readInt();
//写入header
header.setMagic(magic);
header.setSerializer(serializer);
header.setVersion(version);
header.setType(type);
header.setStatus(status);
header.setRequestId(RequestId);
header.setBodyLength(BodyLength);
//获得请求体数据,以上一共17个字节
byte[] bodyBytes = new byte[BodyLength];
//写入请求体数据, 解决粘包问题,只读指定长度的数据
byteBuf.readBytes(bodyBytes,0,BodyLength);
//解析消息体
// 获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if(serializerEnum == null){
throw new RuntimeException("序列化消息的协议不存在");
}
//得到序列化器
Serializer serializer1 = SerializerFactory.getInstance(serializerEnum .getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
if (messageTypeEnum == null) {
throw new RuntimeException("序列化消息的类型不存在");
}
switch (messageTypeEnum) {
case REQUEST:
RpcRequest request = serializer1.deserialize(bodyBytes, RpcRequest.class);
list.add(new ProtocolMessage<>(header,request));
return;
case RESPONSE:
RpcResponse response = serializer1.deserialize(bodyBytes, RpcResponse.class);
list.add(new ProtocolMessage<>(header,response));
return;
case HEART_BEAT:return;
case OTHERS:return;
default:
throw new RuntimeException("暂不支持该消息类型");
}
}
}
灵活,适用于复杂协议。
方法 | 粘包问题解决 | 半包问题解决 | 使用场景 | 优缺点 |
---|---|---|---|---|
定长消息 | 精确分割 | 等待补充数据 | 消息固定长度的协议 | 简单但不灵活,适合定长数据 |
分隔符 | 根据分隔符拆分 | 等待完整数据 | 使用明确分隔符的协议 | 实现简单,但需要额外的分隔符 |
长度字段 | 按长度截取 | 等待完整数据 | 消息中包含长度字段的协议 | 灵活高效,适合大多数场景 |
自定义解码器 | 自行定义逻辑 | 自行定义逻辑 | 协议复杂或不规则 | 灵活性最高,但开发成本高 |