当前位置: 首页 > article >正文

Netty基础—6.Netty实现RPC服务二

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

3.Netty客户端之RPC远程调用过程分析

NettyRpcClient.remoteCall()方法的执行逻辑:

说明一:Netty的客户端和服务端通过connect()方法建立连接后,就会通过sync()方法进行同步阻塞。

说明二:RPC调用其实就是通过调用remoteCall()方法,往Netty客户端的Channel的writeAndFlush()方法写入请求数据,同时也通过sync()方法进行同步阻塞,以便可以等到Netty服务端的响应,从而获得RPC调用结果。

说明三:writeAndFlush()所写的请求数据会经过客户端Channel的pipeline进行处理如编码成二进制字节数组,然后传输给服务端的Channel。

说明四:服务端的Channel收到请求数据后,会经过其pipeline处理如解码二进制字节数据成对象来反射调用对应的方法,然后服务端将反射调用的结果作为响应数据编码后发送回客户端,最后客户端的Channel收到数据解码后获取的响应对象便是RPC调用结果。

public class NettyRpcClient {
    ...
    //如果要实现超时功能,需要在remoteCall()方法被执行时设置该请求的发起时间
    //然后在NettyRpcClientHandler的channelRead()中计算是否超时
    public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
        channelFuture.channel().writeAndFlush(rpcRequest).sync();
        RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse();
        logger.info("receives response from netty rpc server.");

        if (rpcResponse.isSuccess()) {
            return rpcResponse;
        }
        throw rpcResponse.getException();
    }
}

public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);
    private RpcResponse rpcResponse;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.rpcResponse = (RpcResponse) msg;
        logger.error("Netty RPC client receives the response: " + rpcResponse);
    }

    public RpcResponse getRpcResponse() {
        while (rpcResponse == null) {
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
                logger.error("wait for response interrupted", e);
            }
        }
        return rpcResponse;
    }
}

4.RPC网络通信中的编码解码器

(1)RPC的请求响应通信协议

(2)使用Hessian进行序列化与反序列化

(3)RPC的编码器

(4)RPC的解码器

(5)如何解决粘包拆包问题

(1)RPC的请求响应通信协议

//RPC请求
public class RpcRequest {
    private String requestId;
    private String className;
    private String methodName;
    private String[] parameterClasses;//参数类型
    private Object[] parameters;//参数值
    private String invokerApplicationName;//调用方的服务名称
    private String invokerIp;//调用方的IP地址
    ...
}

//RPC响应
public class RpcResponse {
    private String requestId;
    private boolean isSuccess;
    private Object result;
    private Throwable exception;
    ...
}

(2)Hessian序列化与反序列化

需要将请求对象和响应对象序列化成二进制字节数组,以及将获取到的二进制字节数组反序列化成请求对象和响应对象,这里使用Hessian框架来实现序列化与反序列化。

public class HessianSerialization {
    //序列化:将对象序列化成字节数组
    public static byte[] serialize(Object object) throws IOException {
        //new一个字节数组输出流
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        //根据字节数组输出流new一个Hessian序列化输出流
        HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
        //用Hessian序列化输出流去写object
        hessianOutput.writeObject(object);
        //将Hessian序列化输出流转化为字节数组
        byte[] bytes = byteArrayOutputStream.toByteArray();
        return bytes;
    }
    
    //反序列化:将字节数组还原成对象
    public static Object deserialize(byte[] bytes, Class clazz) throws IOException {
        //先封装一个字节数组输入流
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        //将字节数组输入流封装到Hessian序列化输入流里去
        HessianInput hessianInput = new HessianInput(byteArrayInputStream);
        //从Hessian序列化输入流读出一个对象
        Object object = hessianInput.readObject(clazz);
        return object;
    }
}

下面对RpcRequest和RpcResponse进行Hessian序列化测试。注意:RpcRequest和RpcResponse这两个类必须要实现Serializable。

public class HessianSerializationTest {
    public static void main(String[] args) throws Exception {
        RpcRequest rpcRequest = new RpcRequest();//先new一个RpcRequest对象
        rpcRequest.setRequestId(UUID.randomUUID().toString().replace("-", ""));
        rpcRequest.setClassName("TestClass");
        rpcRequest.setMethodName("sayHello");
        rpcRequest.setParameterClasses(new String[]{"String"});
        rpcRequest.setParameters(new Object[]{"wjunt"});
        rpcRequest.setInvokerApplicationName("RpcClient");
        rpcRequest.setInvokerIp("127.0.0.1");

        byte[] bytes = HessianSerialization.serialize(rpcRequest);//进行序列化
        System.out.println(bytes.length);

        RpcRequest deSerializedRpcRequest = (RpcRequest) HessianSerialization.deserialize(bytes, RpcRequest.class);
        System.out.println(deSerializedRpcRequest);
    }
}

(3)RPC的编码器

编码可以理解为进行序列化操作,解码可以理解为进行反序列化操作。

编码器RpcEncoder需要继承Netty的MessageToByteEncoder类,解码器RpcDecoder需要继承Netty的ByteToMessageDecoder类。

反序列化的逻辑需要根据序列化时数据的封装逻辑来处理,比如下面编码后的一条数据是由字节数组长度 + 字节数组组成的,因此反序列化需要根据此来写对应的逻辑。

public class RpcEncoder extends MessageToByteEncoder {
    //要进行序列化的目标类
    private Class<?> targetClass;
    
    public RpcEncoder(Class<?> targetClass) {
        this.targetClass = targetClass;
    }
    
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        //传入的对象o是否是Encoder所指定的类的实例对象
        if (targetClass.isInstance(o)) {
            byte[] bytes = HessianSerialization.serialize(o);

            //将序列化好的字节数组写到byteBuf里去
            //先写数据长度到byteBuf,这个长度就是4个字节的bytes的length
            byteBuf.writeInt(bytes.length);
            //然后再写完整的bytes数组到byteBuf
            byteBuf.writeBytes(bytes);
        }
    }
}

(4)RPC的解码器

解码器的主要步骤如下:

步骤一:消息长度校验与读索引标记

步骤二:消息长度负值校验与拆包校验

步骤三:拆包问题处理与读索引复位

步骤四:将字节数组反序列化为指定类

public class RpcDecoder extends ByteToMessageDecoder {
    private static final int MESSAGE_LENGTH_BYTES = 4;
    private static final int MESSAGE_LENGTH_VALID_MINIMUM_VALUE = 0;

    private Class<?> targetClass;
    public RpcDecoder(Class<?> targetClass) {
        this.targetClass = targetClass;
    }

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        //1.消息长度校验
        //首先校验消息长度的字节数,也就是byteBuf当前可读的字节数,必须达到4个字节,此时才可以继续往下走
        if (byteBuf.readableBytes() < MESSAGE_LENGTH_BYTES) {
            return;
        }
      
        //2.读索引标记
        //对于byteBuf当前可以读的readerIndex,进行mark标记,也就是进行读索引标记
        //后续可以通过这个mark标记,找回来重新发起read读取之前的一个readerIndex位置
        byteBuf.markReaderIndex();

        //3.读取消息长度
        //读取4个字节的int,int代表了消息bytes的长度
        int messageLength = byteBuf.readInt();

        //4.消息长度负值校验
        //如果此时消息长度是小于0,说明此时通信已经出现了故障
        if (messageLength < MESSAGE_LENGTH_VALID_MINIMUM_VALUE) {
            channelHandlerContext.close();
        }
      
        //5.拆包校验
        //判断可读字节数是否小于消息长度,若是则出现了拆包,需要对byteBuf的读索引进行复位,下次再读
        //byteBuf.readableBytes()读完4个字节后继续读byteBuf.readableBytes()
        //如果此时消息字节数据没有接收完整,那么可以读的字节数是比消息字节长度小的,这就是检查经典的拆包问题
        //这时需要进行读索引进行复位,本次不再进行数据处理
        if (byteBuf.readableBytes() < messageLength) {
            byteBuf.resetReaderIndex();
            //出现拆包后,等待下次数据输入时再进行分析
            //EventLoop里有个for循环会不断监听Channel的读事件;
            //当数据还在传输时,由于传输是一个持续的过程,所以在传输数据过程中,Channel会一直产生读事件;
            //这个过程中,只要循环回来执行判断,就肯定满足监听到Channel的读事件;
            //因此在数据还没传输完成时,for循环执行到去判断是否有Channel的读事件,就会出现这种拆包问题;
            //所以只要返回不处理并且复位读索引,那么下次for循环到来又可重新处理该Channel的读事件了;
            return;
        }
      
        //6.将字节数组反序列化为指定类
        byte[] bytes = new byte[messageLength];
        byteBuf.readBytes(bytes);
        Object object = HessianSerialization.deserialize(bytes, targetClass);
        list.add(object);
    }
}

(5)如何解决粘包拆包问题

首先在编码数据包时,需要在数据包开头添加4个字节的int类型的bytes.length,之后任何一个数据包的读取,都必须从4个字节的int(bytes.length)值开始读取,再按照int值读取后续指定数量的字节数,都读取完才能证明读取到一个完整的字节数组。从而解决粘包半包问题,其原理类似于基于长度域的解码器LengthFieldBasedDecoder。


http://www.kler.cn/a/592804.html

相关文章:

  • 痉挛性斜颈护理宝典:重拾生活平衡
  • 2025-03-19 学习记录--C/C++-C语言-单链表的结构体定义 + LNode * 和 LinkList 的区别
  • 如何在 HTML 中实现无障碍访问,列举关键措施?
  • NAT及P2P通信
  • 比较常见的几种排序算法
  • 利用knn算法实现手写数字分类
  • Kafka-QA
  • 前端字段名和后端不一致?解锁 JSON 映射的“隐藏规则” !!!
  • 批量删除 PPT 中的所有图片、某张指定图片或者所有二维码图片
  • 链式二叉树概念和结构
  • GPU视频编解码:X86 DeepStream 视频编解码入门(三)
  • PostgreSQL逻辑复制槽功能
  • 华为全流程全要素研发项目管理(81页PPT)(文末有下载方式)
  • 【从零开始学习计算机科学与技术】计算机网络(六)传输层
  • java后端怎么写好根据角色控制查询不同数据,
  • c++图论(三)之图的遍历
  • 【图论】FLOYD弗洛伊德算法-最短路径
  • js给后端发送请求的方式有哪些
  • 【C++】动态规划从入门到精通
  • C++20 中的同步输出流:`std::basic_osyncstream` 深入解析与应用实践