Netty基础—6.Netty实现RPC服务二
大纲
1.RPC的相关概念
2.RPC服务调用端动态代理实现
3.Netty客户端之RPC远程调用过程分析
4.RPC网络通信中的编码解码器
5.Netty服务端之RPC服务提供端的处理
6.RPC服务调用端实现超时功能
3.Netty客户端之RPC远程调用过程分析
NettyRpcClient.remoteCall()方法的执行逻辑:
说明一:Netty的客户端和服务端通过connect()方法建立连接后,就会通过sync()方法进行同步阻塞。
说明二:RPC调用其实就是通过调用remoteCall()方法,往Netty客户端的Channel的writeAndFlush()方法写入请求数据,同时也通过sync()方法进行同步阻塞,以便可以等到Netty服务端的响应,从而获得RPC调用结果。
说明三:writeAndFlush()所写的请求数据会经过客户端Channel的pipeline进行处理如编码成二进制字节数组,然后传输给服务端的Channel。
说明四:服务端的Channel收到请求数据后,会经过其pipeline处理如解码二进制字节数据成对象来反射调用对应的方法,然后服务端将反射调用的结果作为响应数据编码后发送回客户端,最后客户端的Channel收到数据解码后获取的响应对象便是RPC调用结果。
public class NettyRpcClient {
...
//如果要实现超时功能,需要在remoteCall()方法被执行时设置该请求的发起时间
//然后在NettyRpcClientHandler的channelRead()中计算是否超时
public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
channelFuture.channel().writeAndFlush(rpcRequest).sync();
RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse();
logger.info("receives response from netty rpc server.");
if (rpcResponse.isSuccess()) {
return rpcResponse;
}
throw rpcResponse.getException();
}
}
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);
private RpcResponse rpcResponse;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
this.rpcResponse = (RpcResponse) msg;
logger.error("Netty RPC client receives the response: " + rpcResponse);
}
public RpcResponse getRpcResponse() {
while (rpcResponse == null) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
logger.error("wait for response interrupted", e);
}
}
return rpcResponse;
}
}
4.RPC网络通信中的编码解码器
(1)RPC的请求响应通信协议
(2)使用Hessian进行序列化与反序列化
(3)RPC的编码器
(4)RPC的解码器
(5)如何解决粘包拆包问题
(1)RPC的请求响应通信协议
//RPC请求
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private String[] parameterClasses;//参数类型
private Object[] parameters;//参数值
private String invokerApplicationName;//调用方的服务名称
private String invokerIp;//调用方的IP地址
...
}
//RPC响应
public class RpcResponse {
private String requestId;
private boolean isSuccess;
private Object result;
private Throwable exception;
...
}
(2)Hessian序列化与反序列化
需要将请求对象和响应对象序列化成二进制字节数组,以及将获取到的二进制字节数组反序列化成请求对象和响应对象,这里使用Hessian框架来实现序列化与反序列化。
public class HessianSerialization {
//序列化:将对象序列化成字节数组
public static byte[] serialize(Object object) throws IOException {
//new一个字节数组输出流
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
//根据字节数组输出流new一个Hessian序列化输出流
HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
//用Hessian序列化输出流去写object
hessianOutput.writeObject(object);
//将Hessian序列化输出流转化为字节数组
byte[] bytes = byteArrayOutputStream.toByteArray();
return bytes;
}
//反序列化:将字节数组还原成对象
public static Object deserialize(byte[] bytes, Class clazz) throws IOException {
//先封装一个字节数组输入流
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
//将字节数组输入流封装到Hessian序列化输入流里去
HessianInput hessianInput = new HessianInput(byteArrayInputStream);
//从Hessian序列化输入流读出一个对象
Object object = hessianInput.readObject(clazz);
return object;
}
}
下面对RpcRequest和RpcResponse进行Hessian序列化测试。注意:RpcRequest和RpcResponse这两个类必须要实现Serializable。
public class HessianSerializationTest {
public static void main(String[] args) throws Exception {
RpcRequest rpcRequest = new RpcRequest();//先new一个RpcRequest对象
rpcRequest.setRequestId(UUID.randomUUID().toString().replace("-", ""));
rpcRequest.setClassName("TestClass");
rpcRequest.setMethodName("sayHello");
rpcRequest.setParameterClasses(new String[]{"String"});
rpcRequest.setParameters(new Object[]{"wjunt"});
rpcRequest.setInvokerApplicationName("RpcClient");
rpcRequest.setInvokerIp("127.0.0.1");
byte[] bytes = HessianSerialization.serialize(rpcRequest);//进行序列化
System.out.println(bytes.length);
RpcRequest deSerializedRpcRequest = (RpcRequest) HessianSerialization.deserialize(bytes, RpcRequest.class);
System.out.println(deSerializedRpcRequest);
}
}
(3)RPC的编码器
编码可以理解为进行序列化操作,解码可以理解为进行反序列化操作。
编码器RpcEncoder需要继承Netty的MessageToByteEncoder类,解码器RpcDecoder需要继承Netty的ByteToMessageDecoder类。
反序列化的逻辑需要根据序列化时数据的封装逻辑来处理,比如下面编码后的一条数据是由字节数组长度 + 字节数组组成的,因此反序列化需要根据此来写对应的逻辑。
public class RpcEncoder extends MessageToByteEncoder {
//要进行序列化的目标类
private Class<?> targetClass;
public RpcEncoder(Class<?> targetClass) {
this.targetClass = targetClass;
}
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
//传入的对象o是否是Encoder所指定的类的实例对象
if (targetClass.isInstance(o)) {
byte[] bytes = HessianSerialization.serialize(o);
//将序列化好的字节数组写到byteBuf里去
//先写数据长度到byteBuf,这个长度就是4个字节的bytes的length
byteBuf.writeInt(bytes.length);
//然后再写完整的bytes数组到byteBuf
byteBuf.writeBytes(bytes);
}
}
}
(4)RPC的解码器
解码器的主要步骤如下:
步骤一:消息长度校验与读索引标记
步骤二:消息长度负值校验与拆包校验
步骤三:拆包问题处理与读索引复位
步骤四:将字节数组反序列化为指定类
public class RpcDecoder extends ByteToMessageDecoder {
private static final int MESSAGE_LENGTH_BYTES = 4;
private static final int MESSAGE_LENGTH_VALID_MINIMUM_VALUE = 0;
private Class<?> targetClass;
public RpcDecoder(Class<?> targetClass) {
this.targetClass = targetClass;
}
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//1.消息长度校验
//首先校验消息长度的字节数,也就是byteBuf当前可读的字节数,必须达到4个字节,此时才可以继续往下走
if (byteBuf.readableBytes() < MESSAGE_LENGTH_BYTES) {
return;
}
//2.读索引标记
//对于byteBuf当前可以读的readerIndex,进行mark标记,也就是进行读索引标记
//后续可以通过这个mark标记,找回来重新发起read读取之前的一个readerIndex位置
byteBuf.markReaderIndex();
//3.读取消息长度
//读取4个字节的int,int代表了消息bytes的长度
int messageLength = byteBuf.readInt();
//4.消息长度负值校验
//如果此时消息长度是小于0,说明此时通信已经出现了故障
if (messageLength < MESSAGE_LENGTH_VALID_MINIMUM_VALUE) {
channelHandlerContext.close();
}
//5.拆包校验
//判断可读字节数是否小于消息长度,若是则出现了拆包,需要对byteBuf的读索引进行复位,下次再读
//byteBuf.readableBytes()读完4个字节后继续读byteBuf.readableBytes()
//如果此时消息字节数据没有接收完整,那么可以读的字节数是比消息字节长度小的,这就是检查经典的拆包问题
//这时需要进行读索引进行复位,本次不再进行数据处理
if (byteBuf.readableBytes() < messageLength) {
byteBuf.resetReaderIndex();
//出现拆包后,等待下次数据输入时再进行分析
//EventLoop里有个for循环会不断监听Channel的读事件;
//当数据还在传输时,由于传输是一个持续的过程,所以在传输数据过程中,Channel会一直产生读事件;
//这个过程中,只要循环回来执行判断,就肯定满足监听到Channel的读事件;
//因此在数据还没传输完成时,for循环执行到去判断是否有Channel的读事件,就会出现这种拆包问题;
//所以只要返回不处理并且复位读索引,那么下次for循环到来又可重新处理该Channel的读事件了;
return;
}
//6.将字节数组反序列化为指定类
byte[] bytes = new byte[messageLength];
byteBuf.readBytes(bytes);
Object object = HessianSerialization.deserialize(bytes, targetClass);
list.add(object);
}
}
(5)如何解决粘包拆包问题
首先在编码数据包时,需要在数据包开头添加4个字节的int类型的bytes.length,之后任何一个数据包的读取,都必须从4个字节的int(bytes.length)值开始读取,再按照int值读取后续指定数量的字节数,都读取完才能证明读取到一个完整的字节数组。从而解决粘包半包问题,其原理类似于基于长度域的解码器LengthFieldBasedDecoder。