当前位置: 首页 > article >正文

netty中黏包,半包

什么是黏包,半包

黏包:packet stick,,,接收端一次性接收了很多条完整的消息
半包 : packet fragment ,,, 接收端一次只读到了一部分消息,不是完整的

滑动窗口: 一个tcp协议的请求,是要等服务器的ack回应的,,而滑动窗口允许在窗口内的请求不用等到ack回应,也能继续往后开新的请求发送数据,提高了数据传输效率。。滑动窗口是自适应的,不用自己设置

黏包
  • netty使用ByteBuf作为缓冲区来存储tcp传递的数据,,当数据到达netty的Channel时,会被读入ByteBuf中,,这个缓冲区的大小,会产生黏包,半包问题

  • nagle算法传输层ip层都会对数据加一个报头,,ip层的报头最少占20字节,传输层的报头也最少20字节,,就算发送一个字节的数据,也要带上这么多字节的报头,,nagle算法会尽可能多的去发送数据,,或者等攒够了一批才会发送数据,,,这会产生黏包问题

半包
  • 接收方的滑动窗口比较小,一次性接收不了那么大的数据,要等到ack回应了之后,才能接收下面的数据,,就会半包
  • MSS限制:maximum segment size, 不同的网卡,对数据包的大小是有限制的,一般的网卡传输限制是1500字节(MTU),除开tcp/ip的报文头,,还有1460字节(MSS),,如果超过了这个限制,就会把数据拆分…。。回环地址localhost没有对mss的限制
  • 接收方的ByteBuf特别小,被迫拆分
netty处理黏包半包
  • FixedLengthFrameDecoder定长消息解码器, : 凑够这个长度才会解码,,把最长消息的长度设置到这个解码器中,,其他不够长的消息,补充字节数,凑够这个固定长度,,再发送,,
    只有凑够了这个长度,,才会去解码,,,这样虽然解决了黏包,半包问题,,但是需要补充很多空余的字符,,这些字符原本是不需要的,就造成了空间浪费

  • LineBasedFrameDecoder : 以换行符作为分割标志,,\n ,\r\n, 这里面会设置一个最大长度,,如果超过这个长度,还没找到换行符就会报错

  • DelimiterBasedFrameEncoder: 自定义分割符
    上面两种都要去遍历每个字符,找是不是分割符,,效率很低

  • LengthFieldBasedFrameDecoder : 前面有一部分来约定好传输的字节长度,版本,等信息,后面根据这个长度解析对应长度的字节
    这个类的构造函数有四个参数:

    • lengthFieldOffset : 前面有多少个多余的字节,才到描述数据长度的字节
    • lengthFieldLength : 有多少个字节用来表述 数据的长度
    • lengthAdjustment: 长度调整: 距离多少字节到真实的数据,,,描述数据长度的字节 和 存数据的字节 不一定是挨在一起的
    • initialBytesToStrip: 从头剥离几个字节,,这些描述内容的信息(长度,版本)等,如果不需要可以剥离,如果设置之后就会被剥离掉

LengthFieldBasedFrameDecoder : 是用的最多的

协议的设计和解析
redis协议
   public static void main(String[] args) {

        // 回车符是13 ,,换行符是10
        final byte[] LINE = {13,10};
        NioEventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LoggingHandler());
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                    /**
                     * 连接成功之后执行
                     * @param ctx
                     * @throws Exception
                     */
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {

                        ByteBuf buf = ctx.alloc().buffer();
                        // *3 发送命令的数组元素有几个
                        buf.writeBytes("*3".getBytes());
                        // 回车换行
                        buf.writeBytes(LINE);
                        // 每个命令的键的长度
                        buf.writeBytes("$3".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("set".getBytes());
                        buf.writeBytes(LINE);

                        // 四个字节
                        buf.writeBytes("$4".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("name".getBytes());
                        buf.writeBytes(LINE);

                        buf.writeBytes("$8".getBytes());
                        buf.writeBytes(LINE);
                        buf.writeBytes("waterkid".getBytes());
                        buf.writeBytes(LINE);

                        // 发送数据到redis
                        ctx.writeAndFlush(buf);
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        // netty接收redis返回的数据,变成ByteBuf
                        ByteBuf byteBuf = (ByteBuf) msg;
                        System.out.println(byteBuf.toString(Charset.defaultCharset()));
                    }
                });

            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 6379);
//        channelFuture.sync();
//        channelFuture.channel().closeFuture()
    }
netty处理http协议

netty中提供了很多现成的协议,,比如redis,http等,可以直接调用处理

  public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.group(bossGroup,workerGroup);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LoggingHandler());
                // codec : 既能解码,,也能编码
                // 经过这个编解码器处理之后,会返回两种类型的对象
                // HttpRequest: 包含请求行和请求头
                // HttpContent: 包含请求体    ===》 get请求也会被解析成两个
                ch.pipeline().addLast(new HttpServerCodec());

//                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
//                    @Override
//                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                        System.out.println("msg.getClass() = " + msg.getClass());
//
//                    }
//                });

                // SimpleChannelInboundHandler : 只会处理指定类型的数据,,指定的类型在范型中
                ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                        System.out.println(msg.uri());

                        // 返回响应
                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK);
                        byte[] bytes = "hello".getBytes();
                        // 告诉浏览器,返回的内容有多长,,不然他会一直转圈等待
                        response.headers().setInt("content-length",bytes.length);
                        response.content().writeBytes(bytes);

                        ctx.writeAndFlush(response);
                    }
                });
            }
        });

        ChannelFuture channelFuture = serverBootstrap.bind(8080);
//        channelFuture.channel().closeFuture().sync();
    }
自定义协议

可以自己定义一个协议(共同遵守的约定),,
自定义协议需要约定的内容:

  • 魔数: 用来第一时间判断是否是无效的数据包,,类似java的cafe babe
  • 版本号: 支持协议的升级
  • 序列化算法: 消息正文采用哪种序列化和反序列化方式,,常用的是json,或者是二进制对象流
  • 指令类型: 这个指令是什么类型,登录,注册,单聊,还是群聊
  • 请求序号 : 用来异步操作的,,确保请求的唯一性,服务器可以按照序号恢复顺序
  • 正文长度: 正文有多少字节

Message类是自定义消息的父类,,下面有若干的子类:
在这里插入图片描述

@Slf4j
// 将来将ByteBuf  和  这个范型之间相互转换
public class MessageCodec extends ByteToMessageCodec<Message> {
    /**
     * 魔数:  第一时间判定是否是无效的数据包 。。。
     * 版本号
     * 序列化算法
     * 指令类型:  是登录,注册,单聊,群聊
     * 请求序号 : 为了双工通信,提供异步能力
     * 正文长度
     * 消息正文
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
        // 编码 ,, 将message 编码成  ByteBuf

        // 将消息写入到  Bytebuf中

        out.writeBytes(new byte[]{1,2,3,4}); // 魔数
        // 版本
        out.writeByte(1);

        // 序列化算法   0:表示jdk序列化   1:表示json序列化
        out.writeByte(0);

        // 写入一个字节的 指令类型
        out.writeByte(message.getMessageType());

        // 四个字节的 请求序号
        out.writeInt(message.getSequenceId());

        // 无意义,,对齐填充,,, 2的次幂
        out.writeByte(0xff);

        // 正文长度
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream outputStream = new ObjectOutputStream(bos);
        outputStream.writeObject(message);


        byte[] bytes = bos.toByteArray();

        out.writeInt(bytes.length);

        // 写入内容
        out.writeBytes(bytes);


    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
            // 解码  ,,将ByteBuf变成  message


        int magicNumber = in.readInt();

        byte version = in.readByte();

        byte serializerType = in.readByte();


        // 指令类型
        byte messageType = in.readByte();

        // 消息序号
        int sequenceId = in.readInt();

        // 无意义字符
        in.readByte();

        int length = in.readInt();

        byte[] bytes = new byte[length];
        in.readBytes(bytes,0,length);


        if (serializerType == 0){

            ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Message message = (Message) inputStream.readObject();

            log.debug("{},{},{},{},{},{}",magicNumber,version,serializerType,messageType,sequenceId,length);
            System.out.println("message = " + message);

            // 解码出来的结果,,要存到参数里面去,,不然后面的handler无法拿到解码后的结果
            out.add(message);
        }

    }
}

测试:

   public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel(
                // 解决半包,,半包读取会有问题
                new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
                new LoggingHandler(),new MessageCodec());

        LoginRequestMessage loginRequestMessage = new LoginRequestMessage();
        loginRequestMessage.setUsername("waterkid");
        loginRequestMessage.setPassword("123");

        channel.writeOutbound(loginRequestMessage);


        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null,loginRequestMessage,buf);

        channel.writeInbound(buf);  // writeInbound()  写出数据会将 ByteBuf 的引用计数变成0 ,,会被释放掉,,因为ByteBuf是堆外内存,是零拷贝,,如果共用一个内存,就会有问题
        // 需要 retain 将引用计数+1

    }
名词

流式协议: stream oriented protocol : 消息之间是没有边界的,需要自己去拆解


http://www.kler.cn/a/588170.html

相关文章:

  • HDR图像处理:色调映射和色域映射参数
  • 蓝桥杯好题推荐----最大字段和
  • 使用生成对抗网络(GAN)进行人脸老化生成的Python示例
  • 【机器学习chp13--(下)】人工神经网络—优化算法
  • HCIP笔记整理(一)
  • conda的基本使用及pycharm里设置conda环境
  • AI绘画软件Stable Diffusion详解教程(11):图生图进阶篇(局部用上传蒙版重绘)
  • 7个 Vue 路由守卫的执行顺序
  • 为训练大模型而努力-分享2W多张卡通头像的图片
  • 蓝桥杯真题——洛谷 day 9 枚举、贪心、找规律
  • C语言之数据结构 顺序表的实现
  • 网页制作代码html制作一个网页模板
  • 【Agent】OpenManus-Tool 详细分析
  • 一周学会Flask3 Python Web开发-SQLAlchemy删除数据操作-班级模块
  • Ubuntu 下有线网络图标消失及无法连接网络的解决方案
  • Java 多线程编程:提升系统并发处理能力!
  • Touch panel功能不良分析
  • RAG的工作原理以及案例列举
  • 2.8滑动窗口专题:最小覆盖子串
  • “全志V821:智能玩具的理想之选”——科技赋能,乐趣升级