当前位置: 首页 > article >正文

Netty系列-7 Netty编解码器

背景

netty框架中,自定义解码器的起点是ByteBuf类型的消息, 自定义编码器的终点是ByteBuf类型。

1.解码器

业务解码器的起点是ByteBuf类型

netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter使用默认方式(不处理,向下传递事件)实现了所有的Inbound接口。因此,MessageToMessageEncoder只需要重写channelRead方法,并在该方法中提取消息、转换消息、通过ChannelInvoker将转换后的消息以channelRead事件发向pipeline即可。
MessageToMessageEncoder抽象类的实现如下:

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {

    private final TypeParameterMatcher matcher;

    protected MessageToMessageDecoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }

    protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
    }
    
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            if (acceptInboundMessage(msg)) {
                I cast = (I) msg;
                try {
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            try {
                int size = out.size();
                for (int i = 0; i < size; i++) {
                    ctx.fireChannelRead(out.getUnsafe(i));
                }
            } finally {
                out.recycle();
            }
        }
    }

    protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

1.1 类型的匹配器

MessageToMessageDecoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher,用于指定解码器可以处理的消息类型。可通过构造函数为其设置类型,也可通过泛型指定:

// 使用泛型类型
protected MessageToMessageDecoder() {
    matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}

// 子类调用MessageToMessageDecoder构造器时,传入类型
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
    matcher = TypeParameterMatcher.get(inboundMessageType);
}

一般,通过泛型指定解码器处理的消息对象,即使用MessageToMessageDecoder的无参构造函数。
acceptInboundMessage方法封装matcher的实现,返回布尔值,表示是否支持处理msg消息类型:

public boolean acceptInboundMessage(Object msg) throws Exception {
    return matcher.match(msg);
}

根据matcher的match方法:

private static final class ReflectiveMatcher extends TypeParameterMatcher {
    private final Class<?> type;

    //...

    @Override
    public boolean match(Object msg) {
        // msg消息是否为type类型或者其子类型
        return type.isInstance(msg);
    }
}

1.2 channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 构造List列表对象,存储解码后的对象
    CodecOutputList out = CodecOutputList.newInstance();
    try {
        // 判断是否支持处理消息
        if (acceptInboundMessage(msg)) {
            I cast = (I) msg;
            try {
                // 处理消息,将cast对象解码后的结果存放到out列表中
                decode(ctx, cast, out);
            } finally {
                ReferenceCountUtil.release(cast);
            }
        } else {
            // 不处理消息,以原样保存
            out.add(msg);
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception e) {
        throw new DecoderException(e);
    } finally {
        try {
            int size = out.size();
            // 遍历列表,依次向pipeline触发解码后的对象
            for (int i = 0; i < size; i++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
        } finally {
            out.recycle();
        }
    }
}

逻辑较为清晰:
[1] 构造列表对象out,用于临时存放解码后的消息;
[2] 判断当前解码器是否可以处理该消息,不可以处理,直接添加到out中;可以处理,调用decode方法解码消息,解码结果都添加到out中;
[3] 遍历out列表,将消息以ChannelRead事件传递给向pipeline;
[4] out清理、回收再利用;

1.3 decode方法

decode方法是实际进行消息转换的逻辑,由子类根据业务具体实现:

protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

将msg解码,解码后的对象存放在out中;由于out是数组,因此可以从msg中解码出一个对象,也可以解码出多个。如下所示:

protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    out.add(msg.toString(charset));
}

将ByteBuf类型的msg消息转为一个String类型的对象;

protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    String[] decodedMsgs = msg.toString(charset).split(";");
    for (String decodedMsg: decodedMsgs) {
        out.add(decodedMsg);
    }
}

将ByteBuf转为String,并按照;分隔符进行拆分,每个字符串作为一个消息对象。

2.解码器案例

案例的结构图如下所示,消息流入解码器和流出时的消息类型会发生变化:
在这里插入图片描述
引入三个解码器和一个业务Handler:
[1] 编码器1实现ByteBuf->String类型的转换;
[2] 编码器2实现String->Message1类型的转换;
[3] 编码器3实现Message1->Message2类型的转换;
[4] 业务Handler打印消息类型和消息;
实现类依次为:

// MyMessageDecoder1
public class MyMessageDecoder1 extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        out.add(msg.toString(Charset.defaultCharset()));
    }
}

// MyMessageDecoder2
class MyMessageDecoder2 extends MessageToMessageDecoder<String> {
    @Override
    protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) {
        String[] decodedMsgs = msg.split(";");
        for (String decodedMsg : decodedMsgs) {
            out.add(new Message1(decodedMsg));
        }
    }
}

// MyMessageDecoder3
class MyMessageDecoder3 extends MessageToMessageDecoder<Message1> {
    @Override
    protected void decode(ChannelHandlerContext ctx, Message1 msg, List<Object> out) {
            out.add(new Message2(msg.getContent()));
    }
}

业务Handler定义如下:

private static class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(msg);
    }
}

Message1和Message2消息定义如下:

@Data
@RequiredArgsConstructor
pulic class Message1 {
    private final String content;
}

@Data
@RequiredArgsConstructor
pulic class Message2 {
    private final String content;
}

客户端发送消息:"test1;test2;test3"时:

Microsoft Telnet> send test1;test2;test3
发送字符串 test1;test2;test3
Microsoft Telnet>

服务器日志如下所示:

Message2(content=test1)
Message2(content=test2)
Message2(content=test3)

注意:解码的顺序沿着pipeline进行,因此需要注意调整netty解码器在pipeline中的位置。

如果将3和解码器2的顺序调整一下:

protected void initChannel(NioSocketChannel channel) {
    channel.pipeline().addLast(new MyMessageDecoder1());
    channel.pipeline().addLast(new MyMessageDecoder3());
    channel.pipeline().addLast(new MyMessageDecoder2());
    channel.pipeline().addLast(new MyHandler());
}

重复上述操作,服务器日志如下:

Message1(content=test1)
Message1(content=test2)
Message1(content=test3)

此时,解码器1流出的数据为String类型,流入解码器2时-类型校验不通过直接以流入的String类型流出,流入解码器3时,将String类型转为Message1类型,流入业务Handler进行打印。

3.编码器

业务编码器的终点是ByteBuf类型

netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter,ChannelOutboundHandlerAdapter使用默认方式实现(不处理,向前传递事件)了所有的Outbound接口。因此,MessageToMessageEncoder只需要重写write方法,并在该方法中编码消息、并通过ChannelInvoker将编码后的消息发送到pipeline即可。
MessageToMessageEncoder抽象类的实现如下:

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
    private final TypeParameterMatcher matcher;

    protected MessageToMessageEncoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    }

    protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
        matcher = TypeParameterMatcher.get(outboundMessageType);
    }

    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            if (out != null) {
                try {
                    final int sizeMinusOne = out.size() - 1;
                    if (sizeMinusOne == 0) {
                        ctx.write(out.getUnsafe(0), promise);
                    } else if (sizeMinusOne > 0) {
                        if (promise == ctx.voidPromise()) {
                            writeVoidPromise(ctx, out);
                        } else {
                            writePromiseCombiner(ctx, out, promise);
                        }
                    }
                } finally {
                    out.recycle();
                }
            }
        }
    }

    private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
        final ChannelPromise voidPromise = ctx.voidPromise();
        for (int i = 0; i < out.size(); i++) {
            ctx.write(out.getUnsafe(i), voidPromise);
        }
    }
    private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
        final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        for (int i = 0; i < out.size(); i++) {
            combiner.add(ctx.write(out.getUnsafe(i)));
        }
        combiner.finish(promise);
    }

    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

3.1 类型的匹配器

MessageToMessageEncoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher,用于指定该编码器器可以处理的消息类型,与解码器中的matcher作用完全相同,不再赘述。

3.2 write方法

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    CodecOutputList out = null;
    try {
        // 判断当前编码器是否可以编码消息
        if (acceptOutboundMessage(msg)) {
            out = CodecOutputList.newInstance();
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            try {
                // 编码消息,并将编码后的消息保存到out列表中
                encode(ctx, cast, out);
            } finally {
                ReferenceCountUtil.release(cast);
            }
            if (out.isEmpty()) {
                throw new EncoderException(
                    StringUtil.simpleClassName(this) + " must produce at least one message.");
            }
        } else {
            // 不能编码的消息不处理,直接沿着pipeline向前传递
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable t) {
        throw new EncoderException(t);
    } finally {
        // 遍历out,依次调用ctx.write,沿着pipeline向前传递
        if (out != null) {
            try {
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) {
                    ctx.write(out.getUnsafe(0), promise);
                } else if (sizeMinusOne > 0) {
                    if (promise == ctx.voidPromise()) {
                        writeVoidPromise(ctx, out);
                    } else {
                        writePromiseCombiner(ctx, out, promise);
                    }
                }
            } finally {
                // 清理out列表,回收再利用
                out.recycle();
            }
        }
    }
}

逻辑较为清晰:
[1] 构造列表对象out,用于临时存放编码后的消息;
[2] 判断当前编码器是否可以处理该消息,不可以处理,直接通过ctx.write沿着pipeline向前传递;可以处理,调用encode方法编码消息,编码结果添加到out中;
[3] 遍历out列表,将消息以write事件传递给向pipeline;
[4] out清理、回收再利用;

3.2 encode方法

encode方法是实际进行消息转换的逻辑,由子类根据业务具体实现:

protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

将msg消息进行编码,编码后的对象存放在out中;由于out是数组,因此可以从msg中编码出一个对象,也可以编码出多个,与解码器逻辑相同。

protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    out.add(msg.toString(charset));
}

将ByteBuf类型的msg消息转为一个String类型的对象;

protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    String[] decodedMsgs = msg.toString(charset).split(";");
    for (String decodedMsg: decodedMsgs) {
        out.add(decodedMsg);
    }
}

将ByteBuf转为String,并按照;分隔符进行拆分,每个字符串作为一个消息对象。
netty向外发送数据时,一般经过业务Handler->编码器->HeadContext的流程。
向客户端发送消息的底层实现在HeadContext的unsafe对象(NioSocketChannel的unsafe对象)中,而发送前有消息类型判断:

final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler{ 
	public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
		unsafe.write(msg, promise);
	}
}

unsafe对象的write方法如下:

public final void write(Object msg, ChannelPromise promise) {
    //...
    msg = filterOutboundMessage(msg);
    //...
}

在真实写操作前,通过filterOutboundMessage进行消息类型的判断:

@Override
protected final Object filterOutboundMessage(Object msg) {
    // 要求消息必须时ByteBuf或者FileRegion类型或其子类型
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        return msg;
    }

    throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

由此,编码器将消息传递给HeadContext前,需要将消息最终编码为ByteBuf类型。

4.解码器案例

案例结构图如下所示:
在这里插入图片描述

在章节2中的案例基础上新增两个编码器,并修改业务Handler:
[1] 业务Handler,接收客户端消息后,响应相同消息;
[2] 编码器1:将Message2类型的消息转为String类型;
[3] 编码器2: 将String类型消息转为ByteBuf类型;
代码实现如下:
修改业务Handler:

private static class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(msg);
        // 新增逻辑“将消息对象发送给客户端
        ctx.write(msg);
    }
}

添加编码器:

// 将Message2消息转为String消息
public class MyEncoder1 extends MessageToMessageEncoder<Message2> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message2 msg, List<Object> out) throws Exception {
        out.add(msg.getContent());
    }
}

// 将String消息转为ByteBuf消息
public class MyEncoder2 extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), Charset.defaultCharset()));
    }
}

在MyHandler前依次添加解码器MyEncoder2和MyEncoder1:

protected void initChannel(NioSocketChannel channel) {
    channel.pipeline().addLast(new MyMessageDecoder1());
    channel.pipeline().addLast(new MyMessageDecoder2());
    channel.pipeline().addLast(new MyMessageDecoder3());
    channel.pipeline().addLast(new MyEncoder2());
    channel.pipeline().addLast(new MyEncoder1());
    channel.pipeline().addLast(new MyHandler());
}

可以使用Netty写一个客户端, 也可用客户端工具模拟,这里为了方便,使用SocketTool.exe,控制台日志如下:

14:36:15 发送数据:test1;test2;test3[1次]
14:36:15 收到数据:test1test2test3

注意:客户端收到了test1test2test3消息,在客户端开来是一个消息,但在服务器看来是连续发送的3个消息,消息内容分别为test1和test2和test3。这是TCP的流传输模式导致,可在业务层添加额外处理解决这个问题。将在下一篇文件介绍Netty如何处理粘包和分包问题。


http://www.kler.cn/news/330203.html

相关文章:

  • CSP-J模拟赛一补题报告
  • SQL:函数以及约束
  • 【分布式微服务云原生】详解Redis的主从模式,主服务器挂了如何从多个从服务器选出新的主服务器
  • Charles(青花瓷)抓取https请求
  • 基于Hive和Hadoop的共享单车分析系统
  • 【AI知识点】余弦相似度(Cosine Similarity)
  • HCIP-HarmonyOS Application Developer 习题(四)
  • 5G-A和F5G-A,对于AI意味着什么?
  • 【MySQL】MVCC及其实现原理
  • (28)oracle数据迁移(容器)-部署包资源
  • Linux驱动开发(速记版)--设备树
  • 滚雪球学Oracle[2.3讲]:Oracle Listener配置与管理
  • 路由交换实验指南
  • MongoDB的安装与增删改查基本操作
  • Mars的xlog与Logan
  • JMeter中线程组、HTTP请求的常见参数解释
  • 欧科云链OKLink相约TOKEN2049:更全面、多元与安全
  • scrapy爬虫基础
  • n!尾随零的数量
  • PHP反射机制