当前位置: 首页 > article >正文

netty十八罗汉之——挖耳罗汉(Decoder)

佛教中除不听各种淫邪声音之外,更不可听别人的秘密。因他论耳根最到家,故取挖耳之形,以示耳根清净。

来看看netty的核心组件解码器Decoder

  •  Decoder的作用
  • 半包,粘包问题
  • 从模板和装饰器模式看Decoder解码原理

1.Decoder作用

 最根本的就是将网络中的二进制数据,解析成为目标对象(Object)。

抽象出来本质就是上图,其它任何动作都是在完成这个目的。

对吧,学习一定要抓住本质。

先来看看网络数据再网络和操作系统底层是怎么传输的

a.网络数据是以二进制的数据包进行传输的,在netty中是以bytefuf数据包进行传递。这个概念之     后  章节会讲到。

b.会涉及tcp/ipc传输协议。

 先来看看TCP/IP协议的数据封装和分用过程,大致如下图所示:

 

可以看到没往下传输一层,会加上一个每层的首部。这个可以理解为,唐僧西天取经的过程中,通关文牒都要盖一个章一样。不加这个人家不认你。用的时候在解析出来报文体。

这里就不得不提一个概念MSS(TCP[数据包]每次能够传输的最大数据分段)

后面在传输过程中,超过这个值,数据包会进行拆分;小于这个值,数据包会进行合并。

c.dma复制

简单讲 就是绕过cpu,直接操作内存在设备间传输数据。

具体流程如下:

dam传输数据时,cpu不参与,dma处理完之后通知cpu进行后续处理。

理解这几个点之后,我们再来看一看正真的数据传播流程:

1.首先发送端,在用户缓存区里的bytebuf数据包会进行一次cpu复制;

2.cpu复制之后数据会缓存在发送缓冲区的内核空间里,这时候数据是完整的;

3.内核缓冲区进行一次DMA复制,数据被写入网卡设备中,此时网卡里面的数据包会进行重组;

4.写入网卡的数据通过TCP/IP协议进行传输,组装成新的二进制数据包,有最大数据限制;

5.接收端通过TCP/IP协议接收到二进制数据包,此时数据是完整的;

6.接收端在内核缓冲区进行一次DMA复制,形成新的bytebuf数据包;

7.接收端进行一次cpu复制,bytebuf数据包被写入用户缓冲区。

具体流程如下图:

这个过程会涉及两次分割二进制包:

a.传输过程中的分割;

b.系统复制过程中的分割;

所以粘包,和半包问题的出现就发生在这两个过程中,数据包少了,多个合在一起就会造成粘包。数据包多了,就会进行分割,分割就会打破原来的数据结构,出现半包问题。

来看看netty Decoder是怎么解决这个问题的

1.通过ByteToMessageDecoder 将二进制数据转换成对象

2.通过MessageToMessageDecoder将对象数据转换成对象

先来看看ByteToMessageDecoder 它是一个基类,主要使用模板方法接受管理bytebuf,子类通过实现钩子方法,处理业务逻辑,处理完业务逻辑将写入List<Object>中,之后在流水线上发送到下一站。流水线概念会在之后一章单独讲解。

public class Byte2IntegerDecoder extends ByteToMessageDecoder {

    //钩子实现
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        while (in.readableBytes() >= 4) {
            Integer anInt = in.readInt();
            Logger.info("解码出一个整数: " + anInt);
            out.add(anInt);
        }
    }
}

来看看它的具体使用,继承ByteToMessageDecoder,钩子方法里实现业务逻辑。

回顾一下什么是模板方法:

核心点:抽象类会实现一系列公共方法共子类使用。

1.抽象类会定义一系列的模板方法,子类自动继承。

2.子类通过钩子方法处理具体业务逻辑。

好,我们再看一个它的核心子类ReplayingDecoder(回放解码器)。

什么是回放呢?

我们读取数据的时候是在操作一个二进制数组,数组元素完整的话,指针移动没有问题。但是数据元素不完整,再次读取的时候记数指针会回到开头重新执行。

netty通过checkpoint指针来完成指针回放

再来看看ReplayingDecoder怎么结合模板模式和装饰器模式完成整个解码操作的:

使用模板模式主要是它继承ByteToMessageDecoder父类解码器,实现decode方法。重点看看包装器的使用:

回放解码器在解码的时候会对传进来的bytebuf进行一次包装:

就是这个 replayable.setCumulation(in);

netty 实现了一个 ReplayingDecoderByteBuf,它继承了ByteBuf,所以可以对对传进来的bytebuf进行操作,这个过程就是包装器在起作用。

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        replayable.setCumulation(in);
        try {
            while (in.isReadable()) {
                int oldReaderIndex = checkpoint = in.readerIndex();
                int outSize = out.size();

                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

简化一下看个简单的包装器的例子

public class WrapperModelDemo {

    // Pojo -- 被包装的类型
    static class Pojo implements Sth {
        public void a() {
            System.out.println("a");
        }

        public void b() {
            System.out.println("b");
        }

        public void c() {
            System.out.println("c");
        }

        public void X() {
            System.out.println("ALL - X");
        }
    }

// Wrapper模式

    static class PojoWrapper implements Sth {
        private Sth inner;

        protected PojoWrapper() {

        }

        public void setInner(Sth inner) {
            this.inner = inner;
        }

        @Override
        public void a() {
            System.out.println("PojoWrapper - a");
            inner.a();
        }

        @Override
        public void b() {
            System.out.println("PojoWrapper - a");
            inner.b();
        }

        @Override
        public void c() {
            throw new UnsupportedOperationException("... c  ");
        }

        @Override
        public void X() {
            throw new UnsupportedOperationException("... X");
        }
    }


    @Test
    public void testWrapper() throws Exception {
        Sth pojo = new Pojo();

        pojo.a();
        pojo.b();
        pojo.c();

        PojoWrapper pojoWrapper = new PojoWrapper();
        pojoWrapper.setInner(pojo);


        // 可以尝试注释掉上面的某一行代码, 查看输出结果
        pojoWrapper.a();
        pojoWrapper.b();
        pojoWrapper.c();
    }

}

运行结果:

核心是包装类在实现sth接口时,又把它作为了一个属性设置进来了。

这样结合着看,对回放解码器的解码过程理解起来就会更加清楚。

现在来看看 MessageToMessageDecoder解码器,它的传入对象不再是一个bytebuf,而是一个具体的对象

public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        String target = String.valueOf(msg);
        out.add(target);
    }
}

它有几个重要的子类:

LineBasedFrameDecoder:基于行的解码器,通过换行符(\n 或者 \r\n)来作为帧的分隔符。当接收到数据时,它会在数据中查找换行符,一旦找到,就将从上次解码位置到换行符之间的数据作为一个完整的帧进行解码并返回。如果在接收到的数据中没有找到换行符,那么数据会被缓存起来,直到换行符出现或者缓冲区满了才进行处理。
DelimiterBasedFrameDecoder:基于分隔符的解码器,它允许用户自定义帧的分隔符。在接收到数据后,它会根据用户提供的分隔符(一个或多个字节数组)在数据中进行查找。一旦找到分隔符,就将从上次解码位置到分隔符之前的数据作为一个完整的帧返回。同样,如果没有找到分隔符,数据会被缓存,直到分隔符出现或缓冲区达到一定条件。
LengthFieldBasedFrameDecoder:通过消息中定义的长度字段来确定帧的长度。解码器会首先读取指定长度的字节,这些字节表示后续帧数据的长度。然后,根据这个长度值,从数据流中读取相应长度的数据作为一个完整的帧。这种方式可以精确地定位每个帧的边界,即使数据存在粘包和拆包的情况。

 下面是几个具体实例,可以跑跑看看效果

public class NettyOpenBoxDecoder {
    public static final int MAGICCODE = 9999;
    public static final int VERSION = 100;
    static final String SPLITER = "\r\n";
    static final String SPLITER_3 = "\n";
    static final String SPLITER_2 = "\t";
    static final String CONTENT = "netty:18罗汉系列!";

    /**
     * LineBasedFrameDecoder 使用实例
     */
    @Test
    public void testLineBasedFrameDecoder() {
        try {
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 0; j < 100; j++) {

                //1-3之间的随机数
                int random = RandomUtil.randInMod(3);
                ByteBuf buf = Unpooled.buffer();
                for (int k = 0; k < random; k++) {
                    buf.writeBytes(CONTENT.getBytes("UTF-8"));
                }

                buf.writeBytes(SPLITER.getBytes("UTF-8"));

                channel.writeInbound(buf);
            }


            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    /**
     * LineBasedFrameDecoder 使用实例
     */
    @Test
    public void testDelimiterBasedFrameDecoder() {
        try {
            final ByteBuf delimiter = Unpooled.copiedBuffer(SPLITER_2.getBytes("UTF-8"));

            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(
                            new DelimiterBasedFrameDecoder(1024, true, delimiter));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };

            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 0; j < 100; j++) {

                //1-3之间的随机数
                int random = RandomUtil.randInMod(3);
                ByteBuf buf = Unpooled.buffer();
                for (int k = 0; k < random; k++) {
                    buf.writeBytes(CONTENT.getBytes("UTF-8"));
                }

                buf.writeBytes(SPLITER_2.getBytes("UTF-8"));


                channel.writeInbound(buf);
            }


            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * LineBasedFrameDecoder 使用实例
     */
    @Test
    public void testLengthFieldBasedFrameDecoder() {
        try {

// 1、单字节(无符号):0到255;(有符号):-128到127。
//
//2、双字节(无符号):0到65535;(有符号):-32768 到 32765。
//
//3、四字节(无符号):0到42 9496 7295;(有符号):-21 4748 3648到21 4748 3647。

            //定义一个 基于长度域解码器
            final LengthFieldBasedFrameDecoder decoder =
                    new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);


            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(decoder);
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };


            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 0; j < 100; j++) {
                //1-3之间的随机数
                int random = RandomUtil.randInMod(3);

                ByteBuf buf = Unpooled.buffer();
                byte[] bytes = CONTENT.getBytes("UTF-8");

               //首先 写入头部  head,也就是后面的数据长度

                buf.writeInt(bytes.length * random);

                //然后 写入content

                for (int k = 0; k < random; k++) {
                    buf.writeBytes(bytes);
                }

                channel.writeInbound(buf);
            }


            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }



    /**
     * LengthFieldBasedFrameDecoder 使用实例
     */
    @Test
    public void testLengthFieldBasedFrameDecoder1() {
        try {

            final LengthFieldBasedFrameDecoder spliter =
                    new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(spliter);
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 1; j <= 100; j++) {
                ByteBuf buf = Unpooled.buffer();
                String s = j + "次发送->" + CONTENT;
                byte[] bytes = s.getBytes("UTF-8");
                buf.writeInt(bytes.length);
                System.out.println("bytes length = " + bytes.length);
                buf.writeBytes(bytes);
                channel.writeInbound(buf);
            }

            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }


    /**
     * LengthFieldBasedFrameDecoder 使用实例
     */
    @Test
    public void testLengthFieldBasedFrameDecoder2() {
        try {

            final LengthFieldBasedFrameDecoder spliter =
                    new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(spliter);
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 1; j <= 100; j++) {

              // 分配一个bytebuf
                ByteBuf buf = Unpooled.buffer();

                String s = j + "次发送->" + CONTENT;

                byte[] bytes = s.getBytes("UTF-8");


                //首先 写入头部  head,包括 后面的数据长度

                buf.writeInt(bytes.length);

                buf.writeChar(VERSION);

                //然后 写入  content
                buf.writeBytes(bytes);

                channel.writeInbound(buf);
            }

            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }


    /**
     * LengthFieldBasedFrameDecoder 使用实例 3
     */
    @Test
    public void testLengthFieldBasedFrameDecoder3() {
        try {

            final LengthFieldBasedFrameDecoder spliter =
                    new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10);
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline().addLast(spliter);
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new StringProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 1; j <= 100; j++) {
                ByteBuf buf = Unpooled.buffer();
                String s = j + "次发送->" + CONTENT;
                byte[] bytes = s.getBytes("UTF-8");
                buf.writeChar(VERSION);
                buf.writeInt(bytes.length);
                buf.writeInt(MAGICCODE);
                buf.writeBytes(bytes);
                channel.writeInbound(buf);
            }

            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }


}

最后总结一下:

本文主要是对netty解码器的理解,核心点在 数据在网络和操作系统间的传播和netty结合模板模式装饰器模式解决netty解码过程中的数据问题。

预告一下,下一篇是18罗汉的第二章,netty编码器。


http://www.kler.cn/a/558955.html

相关文章:

  • 一文弄懂RSA算法中的TLS握手流程
  • 大白话“讲”哈希表
  • 数据结构与算法-搜索-双向搜索 和 A*算法(字串变换,八数码,第k短路)
  • 【Vue3+Tres 三维开发】03 - 基本操作
  • 习题解答 | 一维差分与等差数列差分
  • 如何通过 Docker 在没有域名的情况下快速上线客服系统
  • Unity for Python —— 强大的 Python 脚本支持提升 Unity 编辑器效率
  • C语言递归——青蛙跳台阶问题和汉诺塔问题
  • 辗转相除法(欧几里得算法)
  • transformer架构嵌入层位置编码之RoPE旋转位置编码及简单实现示例
  • go-zero学习笔记(五)
  • Windows系统第一次运行C语言程序,环境配置,软件安装等遇到的坑及解决方法
  • 嵌入式之内存管理
  • 【2025.2最新版】从零开始的HTML网页开发学习笔记(包含网页基础概念 HTML语法 前端工具VsCode介绍)
  • mysql之B+ 树索引 (InnoDB 存储引擎)机制
  • 反射和注解
  • 自制操作系统前置知识汇编学习
  • 实验-安装Proteus
  • ZLMediaKi集群设置
  • 简说spring 的设计模式