netty中黏包,半包
什么是黏包,半包
黏包:packet stick,,,接收端一次性接收了很多条完整的消息
半包 : packet fragment ,,, 接收端一次只读到了一部分消息,不是完整的
滑动窗口: 一个tcp协议的请求,是要等服务器的ack回应的,,而滑动窗口允许在窗口内的请求不用等到ack回应,也能继续往后开新的请求发送数据,提高了数据传输效率。。滑动窗口是自适应的,不用自己设置
黏包
-
netty使用
ByteBuf
作为缓冲区来存储tcp传递的数据,,当数据到达netty的Channel时,会被读入ByteBuf中,,这个缓冲区的大小,会产生黏包,半包问题 -
nagle算法
:传输层
和ip层
都会对数据加一个报头,,ip层的报头最少占20字节,传输层的报头也最少20字节,,就算发送一个字节的数据,也要带上这么多字节的报头,,nagle算法会尽可能多的去发送数据,,或者等攒够了一批才会发送数据,,,这会产生黏包问题
半包
- 接收方的滑动窗口比较小,一次性接收不了那么大的数据,要等到ack回应了之后,才能接收下面的数据,,就会半包
- MSS限制:maximum segment size, 不同的网卡,对数据包的大小是有限制的,一般的网卡传输限制是1500字节(MTU),除开tcp/ip的报文头,,还有1460字节(MSS),,如果超过了这个限制,就会把数据拆分…。。回环地址localhost没有对mss的限制
- 接收方的ByteBuf特别小,被迫拆分
netty处理黏包半包
-
FixedLengthFrameDecoder
定长消息解码器, : 凑够这个长度才会解码,,把最长消息的长度设置到这个解码器中,,其他不够长的消息,补充字节数,凑够这个固定长度,,再发送,,
只有凑够了这个长度,,才会去解码,,,这样虽然解决了黏包,半包问题,,但是需要补充很多空余的字符,,这些字符原本是不需要的,就造成了空间浪费 -
LineBasedFrameDecoder
: 以换行符作为分割标志,,\n
,\r\n
, 这里面会设置一个最大长度,,如果超过这个长度,还没找到换行符就会报错 -
DelimiterBasedFrameEncoder
: 自定义分割符
上面两种都要去遍历每个字符,找是不是分割符,,效率很低 -
LengthFieldBasedFrameDecoder
: 前面有一部分来约定好传输的字节长度,版本,等信息,后面根据这个长度解析对应长度的字节
这个类的构造函数有四个参数:- lengthFieldOffset : 前面有多少个多余的字节,才到描述数据长度的字节
- lengthFieldLength : 有多少个字节用来表述 数据的长度
- lengthAdjustment: 长度调整: 距离多少字节到真实的数据,,,描述数据长度的字节 和 存数据的字节 不一定是挨在一起的
- initialBytesToStrip: 从头剥离几个字节,,这些描述内容的信息(长度,版本)等,如果不需要可以剥离,如果设置之后就会被剥离掉
LengthFieldBasedFrameDecoder
: 是用的最多的
协议的设计和解析
redis协议
public static void main(String[] args) {
// 回车符是13 ,,换行符是10
final byte[] LINE = {13,10};
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
/**
* 连接成功之后执行
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
// *3 发送命令的数组元素有几个
buf.writeBytes("*3".getBytes());
// 回车换行
buf.writeBytes(LINE);
// 每个命令的键的长度
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
// 四个字节
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("waterkid".getBytes());
buf.writeBytes(LINE);
// 发送数据到redis
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// netty接收redis返回的数据,变成ByteBuf
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379);
// channelFuture.sync();
// channelFuture.channel().closeFuture()
}
netty处理http协议
netty中提供了很多现成的协议,,比如redis,http等,可以直接调用处理
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(bossGroup,workerGroup);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
// codec : 既能解码,,也能编码
// 经过这个编解码器处理之后,会返回两种类型的对象
// HttpRequest: 包含请求行和请求头
// HttpContent: 包含请求体 ===》 get请求也会被解析成两个
ch.pipeline().addLast(new HttpServerCodec());
// ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("msg.getClass() = " + msg.getClass());
//
// }
// });
// SimpleChannelInboundHandler : 只会处理指定类型的数据,,指定的类型在范型中
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
System.out.println(msg.uri());
// 返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "hello".getBytes();
// 告诉浏览器,返回的内容有多长,,不然他会一直转圈等待
response.headers().setInt("content-length",bytes.length);
response.content().writeBytes(bytes);
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
// channelFuture.channel().closeFuture().sync();
}
自定义协议
可以自己定义一个协议(共同遵守的约定),,
自定义协议需要约定的内容:
- 魔数: 用来第一时间判断是否是无效的数据包,,类似java的cafe babe
- 版本号: 支持协议的升级
- 序列化算法: 消息正文采用哪种序列化和反序列化方式,,常用的是json,或者是二进制对象流
- 指令类型: 这个指令是什么类型,登录,注册,单聊,还是群聊
- 请求序号 : 用来异步操作的,,确保请求的唯一性,服务器可以按照序号恢复顺序
- 正文长度: 正文有多少字节
Message类是自定义消息的父类,,下面有若干的子类:
@Slf4j
// 将来将ByteBuf 和 这个范型之间相互转换
public class MessageCodec extends ByteToMessageCodec<Message> {
/**
* 魔数: 第一时间判定是否是无效的数据包 。。。
* 版本号
* 序列化算法
* 指令类型: 是登录,注册,单聊,群聊
* 请求序号 : 为了双工通信,提供异步能力
* 正文长度
* 消息正文
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
// 编码 ,, 将message 编码成 ByteBuf
// 将消息写入到 Bytebuf中
out.writeBytes(new byte[]{1,2,3,4}); // 魔数
// 版本
out.writeByte(1);
// 序列化算法 0:表示jdk序列化 1:表示json序列化
out.writeByte(0);
// 写入一个字节的 指令类型
out.writeByte(message.getMessageType());
// 四个字节的 请求序号
out.writeInt(message.getSequenceId());
// 无意义,,对齐填充,,, 2的次幂
out.writeByte(0xff);
// 正文长度
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(bos);
outputStream.writeObject(message);
byte[] bytes = bos.toByteArray();
out.writeInt(bytes.length);
// 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
// 解码 ,,将ByteBuf变成 message
int magicNumber = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
// 指令类型
byte messageType = in.readByte();
// 消息序号
int sequenceId = in.readInt();
// 无意义字符
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes,0,length);
if (serializerType == 0){
ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) inputStream.readObject();
log.debug("{},{},{},{},{},{}",magicNumber,version,serializerType,messageType,sequenceId,length);
System.out.println("message = " + message);
// 解码出来的结果,,要存到参数里面去,,不然后面的handler无法拿到解码后的结果
out.add(message);
}
}
}
测试:
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
// 解决半包,,半包读取会有问题
new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
new LoggingHandler(),new MessageCodec());
LoginRequestMessage loginRequestMessage = new LoginRequestMessage();
loginRequestMessage.setUsername("waterkid");
loginRequestMessage.setPassword("123");
channel.writeOutbound(loginRequestMessage);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null,loginRequestMessage,buf);
channel.writeInbound(buf); // writeInbound() 写出数据会将 ByteBuf 的引用计数变成0 ,,会被释放掉,,因为ByteBuf是堆外内存,是零拷贝,,如果共用一个内存,就会有问题
// 需要 retain 将引用计数+1
}
名词
流式协议: stream oriented protocol : 消息之间是没有边界的,需要自己去拆解