Netty基础—8.Netty实现私有协议栈一
大纲
1.私有协议介绍
2.私有协议的通信模型
3.私有协议栈的消息定义
4.私有协议栈链路的建立
5.私有协议栈链路的关闭
6.私有协议栈的心跳机制
7.私有协议栈的重连机制
8.私有协议栈的重复登录保护
9.私有协议栈核心的ChannelHandler
10.私有协议栈的客户端和服务端
11.私有协议栈的Packet数据包与编解码
12.私有协议栈的会话ID处理器
13.私有协议栈的握手处理器
14.私有协议栈的链路保活处理器
1.私有协议介绍
(1)什么是私有协议
(2)公有协议与私有协议
(3)使用Netty定制私有协议
(4)私有协议栈的基本功能
(1)什么是私有协议
跨节点的远程服务调用(跨节点通信),除了链路层的物理连接外,还需要对请求和响应消息进行编解码。在请求和应答消息本身以外,也需要携带一些其他控制和管理类指令。例如链路建立的握手请求和响应消息、链路检测的心跳消息等。当这些功能组合到一起后就会形成私有协议。私有协议并没有标准的定义,只要是能够用于跨进程、跨主机数据交换的非标准协议,都可以称为私有协议。
(2)公有协议与私有协议
公有协议是类似于HTTP、WebSocket这样公开的一套协议,所有人都可以基于这个协议来进行通信。
私有协议是类似于RPC这样的自定义的通信协议,需要自定义请求消息应包含哪些内容和响应消息应包含哪些内容。
偏向于中间件类的软件产品,内部的各节点之间通常使用私有协议进行通信。Java Web系统、业务系统内部之间的通信,则通常使用Dubbo这种RPC框架进行通信。Dubbo有一套Dubbo协议,业务系统之间的通信直接使用Dubbo协议即可,无须再自定义协议。对于其他的比如自研微服务治理平台,那么其内部通信可能就需要自定义一套私有协议。
(3)使用Netty定制私有协议
绝大多数的私有协议的传输层都是基于TCP/IP协议,所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。
下面使用Netty提供的异步TCP协议栈开发一个私有协议栈。这个私有协议栈可用于内部各模块之间的通信,它基于TCP/IP协议栈,是一个类HTTP协议的应用层协议栈。相比于传统的标准协议栈,Netty协议栈更加轻巧、灵活。
建立在TCP协议之上的应用层公有协议有:HTTP协议。TCP连接建立后,发送的请求数据时带的数据会按照HTTP协议来组织,返回的响应数据也会按照HTTP协议来组织。
(4)私有协议栈的基本功能
这个私有协议栈可用于承载业务内部各模块之间的消息交互和服务调用,功能有:
一.提供高性能的异步通信能力(基于Netty)
二.提供消息的编码解码能力
三.提供IP白名单认证机制
四.提供链路的保活探测机制
五.提供链路的断连重连机制
2.私有协议栈的通信模型
一.客户端发送握手请求消息
二.服务端对握手请求消息进行合法性校验
三.链路建立成功后,客户端发送业务消息
四.链路建立成功后,服务端发送心跳消息
五.链路建立成功后,客户端发送心跳消息
六.链路建立成功后,服务端发送业务消息
说明一:Netty协议栈的通信双方在链路建立成功后,双方可进行全双工通信。无论客户端还是服务端,都可以主动发送请求消息给对方,通信方可用是TWO WAY或者ONE WAY。
说明二:双方之间的心跳采用Ping-Pong机制。当链路处于空闲时(即链路已经长时间没有通信),客户端主动发送Ping消息给服务端,服务端收到Ping消息后发送应答消息(即Pong消息)给客户端。
说明三:如果客户端连续发送N条Ping消息都没有收到服务端返回的Pong消息,说明链路断开或服务端异常。此时客户端会主动关闭连接,间隔周期T之后再发起重连操作,直到重连成功。
3.私有协议栈的消息定义
任何一个自定义协议,都必须有消息头Header + 消息体Body。消息头Header里会存放一些消息的元数据,消息体Body里会存放完整的请求体数据。
消息头Header里可以放:
一.crcCode(32位的int型)
CRC检验码由三部分组成:第一部分是2个字节的固定值,表明这是某协议的消息。第二部分是1字节的消息主版本号,第三部分是1字节的消息次版本号。
二.length(32位的int型)
这是整个消息的消息长度:包括消息头Header + 消息体Body。
三.SessionId(64位的long型)
建立TCP长连接后,这个长连接就是一个会话,会话ID在集群节点内全局唯一。
四.type(8位Byte型)
表示当前消息的类型。比如0是业务请求消息、1是业务响应消息、2是业务ONE WAY消息(既是请求又是响应消息)、3是握手请求消息、4是握手响应消息、5是心跳请求消息、6是心跳应答消息。
4.私有协议栈链路的建立
如果A节点需要调用B节点的服务,但是A和B之间还没有建立物理层链路,即Netty的connect()方法触发的TCP的三次握手。那么会由调用方主动发起连接,此时调用方为客户端,被调用方为服务端。
建立好物理层链路后,还需要建立应用层链路。此时客户端会发送握手请求消息给服务端,服务端收到握手请求消息后,如果通过IP白名单校验,则返回握手成功应答消息给客户端,应用层链路建立成功。
之后,客户端和服务端就可以互相发送业务消息了。当然,服务端还要处理握手请求超时的情况。
5.私有协议栈链路的关闭
由于客户端和服务端采用长连接进行通信,而且在正常的业务运行期间,双方通过心跳和业务消息来维持链路,所以任何一方都不需要主动关闭连接。
但在以下情况下,客户端和服务端需要关闭连接:
一.一方宕机或重启
二.消息读写IO异常
三.心跳消息读写IO异常
四.心跳超时
五.编码异常
6.私有协议栈的心跳机制
心跳机制用于解决网络超时、闪断、对方进程僵死或者处理缓慢等情况。客户端和服务端会在网络空闲时采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路、主动重连。
说明一:每隔时间周期T如10秒,客户端和服务端各自做一次心跳检查。
说明二:当其中一方发现网络处于空闲状态的持续时间已经达到t比如1分钟时,那么就主动发送Ping心跳消息给另一方。
说明三:如果在下一个周期T到来时,一方没有收到另一方发送的Pong心跳应答消息或者业务消息,则心跳失败计数器 + 1。
说明四:每当一方接收到另一方的Pong心跳应答消息或者业务消息时,则将心跳失败计数器清零。连续N次没有收到另一方的Pong心跳应答消息或者业务消息时,则关闭链路。如果是客户端关闭了链路,那么在间隔INTERVAL时间后再发起重连操作。
Ping-Pong双向心跳机制:客户端和服务端共用一个KeepAliveHandler,并各自启动心跳线程。
7.私有协议栈的重连机制
如果链路中断,客户端等待INTERVAL时间后会发起重连操作。如果重连失败,间隔INTERVAL时间后再次发起重连,直到重连成功。
为了保证服务端能有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间后再发起重连,而不是失败后就立即重连。重连失败时,客户端也要保证自身资源能被及时释放。
8.私有协议栈的重复登录保护
当客户端握手成功后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致服务端的句柄资源被耗尽。
服务端接收到客户端的握手请求消息后,首先对IP地址进行合法性检验。如果检验成功,则在会话缓存中查看客户端是否已经登录。如果已经登录,则拒绝重复登录,同时关闭TCP链路。
客户端收到握手失败的消息后,要关闭客户端的TCP连接,等待INTERVAL时间后,再发起TCP连接,直到认证成功。
当服务端连续N次心跳超时后需要主动关闭链路,同时还要清空该客户端的会话缓存信息,以保证可重连成功。
9.私有协议栈核心的ChannelHandler
主要五种ChannelHandler:
一.Packet数据包编码器PacketEncoder
二.Packet数据包解码器PacketDecoder
三.握手机制处理HandShakeHandler
四.会话ID管理SessionIdHandler
五.链路保活处理KeepAliveHandler
ch.pipeline()
.addLast(new PacketEncoder())
.addLast(new PacketDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH))
.addLast(new SessionIdHandler(false))
.addLast(new HandshakeHandler(NettyMode.SERVER.value()))
.addLast(new KeepAliveHandler());
SessionIdHandler会话ID处理器(重复登录保护)在pipeline链条中排第三。
HandshakeHandler握手处理器(握手超时检查)在pipeline链条排第四。
KeepAliveHandler链路保活处理器(心跳检查机制)在pipeline链条排第五。
10.私有协议栈的客户端和服务端
(1)可断网重连的客户端
public class NettyClient {
private static final Logger logger = LogManager.getLogger(NettyClient.class);
private static final int SCHEDULED_THREAD_NUM = 1;
private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 4;
private static final int LENGTH_FIELD_LENGTH = 4;
private ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(SCHEDULED_THREAD_NUM);
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
public void connect(final String host, final int port) throws InterruptedException {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new PacketEncoder())
.addLast(new PacketDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH))
.addLast(new SessionIdHandler(true))
.addLast(new HandshakeHandler(NettyMode.CLIENT.value()))
.addLast(new KeepAliveHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(host, port)).sync();
channelFuture.channel().closeFuture().sync();//阻塞等待直到连接被关闭
} finally {
//如果连接被关闭了,上面的closeFuture().sync()就不会再进行阻塞,于是下面的代码就会被执行进行重连
threadPool.execute(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
try {
//递归调用
connect(host, port);
} catch(InterruptedException e) {
logger.error("Netty client connect error.");
}
} catch (InterruptedException e) {
logger.error("Socket channel close thread interrupted exception.");
}
}
});
}
}
}
(2)私有协议栈的服务端
public class NettyServer {
private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 4;
private static final int LENGTH_FIELD_LENGTH = 4;
public void bind(int port) throws Exception {
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new PacketEncoder())
.addLast(new PacketDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH))
.addLast(new SessionIdHandler(false))
.addLast(new HandshakeHandler(NettyMode.SERVER.value()))
.addLast(new KeepAliveHandler());
}
});
serverBootstrap.bind(port).sync();
}
}
11.私有协议栈的Packet数据包与编解码
(1)Packet数据包的数据结构
(2)从数据读取角度完善Packet数据包
(3)Packet数据包的编码器实现
(4)基于builder设计模式重构Packet数据包
(5)Packet数据包的解码器实现
(1)Packet数据包的数据结构
//数据包
public class Packet {
private Header header = new Header();
private Body body = new Body();
class Header {
//4个字节检验值
private int crc;
//完整数据包的字节数量的大小
private int size;
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extend = new HashMap<String, Object>();
}
class Body {
}
}
(2)从数据读取角度完善Packet数据包
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private Header header = new Header();
private Body body = new Body();
class Header {
//4个字节检验值:magic number + major version + minor version
private int crc;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize;
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头header字节数量的大小
private int extendHeaderSize;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//body的字节数量的大小
private int bodySize;
}
class Body {
private byte[] bytes;
}
}
(3)Packet数据包的编码器实现
一.序列化工具使用Hessian
public class SerializeUtils {
public static byte[] serialize(Object object) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
hessianOutput.writeObject(object);
byte[] bytes = byteArrayOutputStream.toByteArray();
return bytes;
}
public static Object deserialize(byte[] bytes, Class clazz) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
HessianInput hessianInput = new HessianInput(byteArrayInputStream);
Object object = hessianInput.readObject(clazz);
return object;
}
}
二.封装Packet数据包时需要进行序列化和字节大小计算
//Packet数据包编码器
public class PacketEncoder extends MessageToMessageEncoder<Packet> {
protected void encode(ChannelHandlerContext ctx, Packet msg, List<Object> out) throws Exception {
Packet packet = msg;
if (packet == null || packet.getHeader() == null) {
throw new ProtocolException("packet or header is null.");
}
//分配一块不在内存池子里的临时内存块,底层就是一个byte[]字节数组
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(packet.getHeader().getCrc());
byteBuf.writeInt(packet.getPacketSize());
byteBuf.writeLong(packet.getHeader().getSessionId());
byteBuf.writeByte(packet.getHeader().getType());
byteBuf.writeByte(packet.getHeader().getLevel());
byteBuf.writeInt(packet.getHeader().getExtendHeaders().length);
byteBuf.writeBytes(packet.getHeader().getExtendHeaders());
byteBuf.writeInt(packet.getBody().length);
byteBuf.writeBytes(packet.getBody());
out.add(byteBuf);
}
}
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private static final int ZERO_BYTES = 0;
private static final int SINGLE_BYTES = 1;
private static final int INT_BYTES = 4;
private static final int LONG_BYTES = 8;
private Header header = new Header();
private Object body;
private byte[] bodyBytes;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize = ZERO_BYTES;
class Header {
//4个字节检验值:magic number + major version + minor version
private int crc = 0xabef0101;
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//扩展消息头序列化后的字节数组
private byte[] extendHeadersBytes;
public int getCrc() { return crc; }
public void setCrc(int crc) { this.crc = crc; }
public long getSessionId() { return sessionId; }
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
packetSize += LONG_BYTES;
}
public byte getType() { return type; }
public void setType(byte type) {
this.type = type;
packetSize += SINGLE_BYTES;
}
public byte getLevel() { return level; }
public void setLevel(byte level) {
this.level = level;
packetSize += SINGLE_BYTES;
}
public byte[] getExtendHeaders() {
return extendHeadersBytes;
}
public void setExtendHeaders(Map<String, Object> extendHeaders) throws IOException {
this.extendHeaders = extendHeaders;
this.extendHeadersBytes = SerializeUtils.serialize(extendHeaders);
packetSize += INT_BYTES;
packetSize += this.extendHeadersBytes.length;
}
}
class Body {
private byte[] bytes;
}
public int getPacketSize() { return packetSize; }
public void setPacketSize(int packetSize) { this.packetSize = packetSize; }
public Header getHeader() { return header; }
public void setHeader(Header header) { this.header = header; }
public byte[] getBody() { return bodyBytes; }
public void setBody(byte[] body) throws IOException {
this.body = body;
this.bodyBytes = SerializeUtils.serialize(body);
packetSize += INT_BYTES;
packetSize += this.bodyBytes.length;
}
}
(4)基于builder设计模式重构Packet数据包
Builder设计模式:Builder是一个静态内部类、会new一个Packet实例对象。Builder类中除了build()方法返回实例对象,其他方法都是设置实例对象并返回Builder。此Packet类还有一个静态方法builder()返回一个静态内部类Builder的实例对象。
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private static final int ZERO_BYTES = 0;
private static final int SESSION_ID_BYTES = 8;
private static final int TYPE_BYTES = 1;
private static final int LEVEL_BYTES = 1;
private static final int EXTEND_HEADERS_SIZE_BYTES = 4;
private static final int BODY_SIZE_BYTES = 4;
private Header header = new Header();
private Object body = new Body();
private byte[] bodyBytes;
//4个字节检验值:magic number + major version + minor version
private int crc = 0xabef0101;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize = ZERO_BYTES;
class Header {
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//扩展消息头序列化后的字节数组
private byte[] extendHeadersBytes;
public long getSessionId() { return sessionId; }
public void setSessionId(long sessionId) { this.sessionId = sessionId; }
public byte getType() { return type; }
public void setType(byte type) { this.type = type; }
public byte getLevel() { return level; }
public void setLevel(byte level) { this.level = level; }
public Map<String, Object> getExtendHeaders() { return extendHeaders; }
public void setExtendHeaders(Map<String, Object> extendHeaders) { this.extendHeaders = extendHeaders; }
public byte[] getExtendHeadersBytes() { return extendHeadersBytes; }
public void setExtendHeadersBytes(byte[] extendHeadersBytes) { this.extendHeadersBytes = extendHeadersBytes; }
}
class Body {
private byte[] bytes;
}
public int getCrc() { return crc; }
public void setCrc(int crc) { this.crc = crc; }
public int getPacketSize() { return packetSize; }
public void setPacketSize(int packetSize) { this.packetSize = packetSize; }
public Header getHeader() { return header; }
public void setHeader(Header header) { this.header = header; }
public Object getBody() { return body; }
public void setBody(Object body) { this.body = body; }
public byte[] getBodyBytes() { return bodyBytes; }
public void setBodyBytes(byte[] bodyBytes) { this.bodyBytes = bodyBytes; }
static class Builder {
private Packet packet = new Packet();
public Builder sessionId(long sessionId) { packet.getHeader().setSessionId(sessionId); return this; }
public Builder type(byte type) { packet.getHeader().setType(type); return this; }
public Builder level(byte level) { packet.getHeader().setLevel(level); return this; }
public Builder extendHeader(String key, Object val) { packet.getHeader().getExtendHeaders().put(key, val); return this; }
public Builder body(Object body) { packet.setBody(body); return this; }
public Packet build() throws IOException {
byte[] extendHeadersBytes = SerializeUtils.serialize(packet.getHeader().getExtendHeaders());
packet.getHeader().setExtendHeadersBytes(extendHeadersBytes);
byte[] bodyBytes = SerializeUtils.serialize(packet.getBody());
packet.setBodyBytes(bodyBytes);
int packageSize = ZERO_BYTES + SESSION_ID_BYTES + TYPE_BYTES + LEVEL_BYTES
+ EXTEND_HEADERS_SIZE_BYTES + packet.getHeader().getExtendHeadersBytes().length
+ BODY_SIZE_BYTES + packet.getBodyBytes().length;
packet.setPacketSize(packageSize);
return packet;
}
}
public static Builder builder() {
return new Builder();
}
}
//数据包编码器
public class PacketEncoder extends MessageToMessageEncoder<Packet> {
protected void encode(ChannelHandlerContext ctx, Packet msg, List<Object> out) throws Exception {
Packet packet = msg;
if (packet == null || packet.getHeader() == null) {
throw new ProtocolException("packet or header is null.");
}
//分配一块不在内存池子里的临时内存块,底层就是一个byte[]字节数组
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(packet.getCrc());
byteBuf.writeInt(packet.getPacketSize());
byteBuf.writeLong(packet.getHeader().getSessionId());
byteBuf.writeByte(packet.getHeader().getType());
byteBuf.writeByte(packet.getHeader().getLevel());
byteBuf.writeInt(packet.getHeader().getExtendHeadersBytes().length);
byteBuf.writeBytes(packet.getHeader().getExtendHeadersBytes());
byteBuf.writeInt(packet.getBodyBytes().length);
byteBuf.writeBytes(packet.getBodyBytes());
out.add(byteBuf);
}
}
(5)Packet数据包的解码器实现
一.解码器解码扩展头字段
//数据包解码器
public class PacketDecoder extends LengthFieldBasedFrameDecoder {
//解码器会负责粘包和拆包的处理,确保拿到的ByteBuf是一段完整的协议对象字节数据
//frame是一个数据帧,就是一个完整的协议对象字节数据
//maxFrameLength,设定一下这个协议对象的帧数据的最大的大小
//lengthFieldOffset,代表帧数据有多少自己的field,它的offset是哪一位
//从offset那个位置开始读,读lengthFieldLength个字节数量,就可以了
public PacketDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);//利用父类实现粘包拆包
if (frame == null) {
return null;
}
Packet packet = Packet.builder()
.crc(frame.readInt())
.packetSize(frame.readInt())
.sessionId(frame.readLong())
.type(frame.readByte())
.level(frame.readByte())
.extendHeaders(decodeExtendHeaders(frame))
.body(decodeBody(frame))
.build();
return packet;
}
private Map<String, Object> decodeExtendHeaders(ByteBuf in) throws IOException {
int extendHeadersBytesSize = in.readInt();
byte[] extendHeadersBytes = new byte[extendHeadersBytesSize];
in.readBytes(extendHeadersBytes);
Map<String, Object> extendHeaders = (Map<String, Object>)SerializeUtils.deserialize(extendHeadersBytes, Map.class);
return extendHeaders;
}
private Object decodeBody(ByteBuf in) throws IOException, ClassNotFoundException {
...
}
}
二.解码器解码PacketBody对象
首先给Packet数据包增加bodyClazz字段表明body的类名:
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private static final int ZERO_BYTES = 0;
private static final int SESSION_ID_BYTES = 8;
private static final int TYPE_BYTES = 1;
private static final int LEVEL_BYTES = 1;
private static final int EXTEND_HEADERS_SIZE_BYTES = 4;
private static final int BODY_SIZE_BYTES = 4;
private static final int BODY_CLAZZ_SIZE_BYTES = 4;
private Header header = new Header();
private Object body = new Body();
private byte[] bodyBytes;
private String bodyClazz;
//4个字节检验值:magic number + major version + minor version
private int crc = 0xabef0101;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize = ZERO_BYTES;
class Header {
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//扩展消息头序列化后的字节数组
private byte[] extendHeadersBytes;
public long getSessionId() { return sessionId; }
public void setSessionId(long sessionId) { this.sessionId = sessionId; }
public byte getType() { return type; }
public void setType(byte type) { this.type = type; }
public byte getLevel() { return level; }
public void setLevel(byte level) { this.level = level; }
public Map<String, Object> getExtendHeaders() { return extendHeaders; }
public void setExtendHeaders(Map<String, Object> extendHeaders) { this.extendHeaders = extendHeaders; }
public byte[] getExtendHeadersBytes() { return extendHeadersBytes; }
public void setExtendHeadersBytes(byte[] extendHeadersBytes) { this.extendHeadersBytes = extendHeadersBytes; }
}
class Body {
private byte[] bytes;
}
public int getCrc() { return crc; }
public void setCrc(int crc) { this.crc = crc; }
public int getPacketSize() { return packetSize; }
public void setPacketSize(int packetSize) { this.packetSize = packetSize; }
public Header getHeader() { return header; }
public void setHeader(Header header) { this.header = header; }
public Object getBody() { return body; }
public void setBody(Object body) { this.body = body; }
public byte[] getBodyBytes() { return bodyBytes; }
public void setBodyBytes(byte[] bodyBytes) { this.bodyBytes = bodyBytes; }
public String getBodyClazz() { return bodyClazz; }
public void setBodyClazz(String bodyClazz) { this.bodyClazz = bodyClazz; }
static class Builder {
private Packet packet = new Packet();
public Builder crc(int crc) { packet.setCrc(crc); return this; }
public Builder packetSize(int packetSize) { packet.setPacketSize(packetSize); return this; }
public Builder sessionId(long sessionId) { packet.getHeader().setSessionId(sessionId); return this; }
public Builder type(byte type) { packet.getHeader().setType(type); return this; }
public Builder level(byte level) { packet.getHeader().setLevel(level); return this; }
public Builder extendHeader(String key, Object val) { packet.getHeader().getExtendHeaders().put(key, val); return this; }
public Builder extendHeaders(Map<String, Object> extendHeaders) { packet.getHeader().setExtendHeaders(extendHeaders); return this; }
public Builder body(Object body) { packet.setBody(body); return this; }
public Packet build() throws IOException {
if (packet.getBody() == null) {
throw new ProtocolException("packet body is null.");
}
if (packet.getPacketSize() > 0) {
return packet;
}
byte[] extendHeadersBytes = SerializeUtils.serialize(packet.getHeader().getExtendHeaders());
packet.getHeader().setExtendHeadersBytes(extendHeadersBytes);
byte[] bodyBytes = SerializeUtils.serialize(packet.getBody());
packet.setBodyBytes(bodyBytes);
packet.setBodyClazz(packet.getBody().getClass().getName());
int packageSize = ZERO_BYTES + SESSION_ID_BYTES + TYPE_BYTES + LEVEL_BYTES
+ EXTEND_HEADERS_SIZE_BYTES + packet.getHeader().getExtendHeadersBytes().length
+ BODY_SIZE_BYTES + packet.getBodyBytes().length
+ BODY_CLAZZ_SIZE_BYTES + packet.getBodyClazz().getBytes().length;
packet.setPacketSize(packageSize);
return packet;
}
}
public static Builder builder() {
return new Builder();
}
}
下面是对字节数组body部分进行解码的逻辑:
//数据包解码器
public class PacketDecoder extends LengthFieldBasedFrameDecoder {
//解码器会负责粘包和拆包的处理,确保拿到的ByteBuf是一段完整的协议对象字节数据
//frame是一个数据帧,就是一个完整的协议对象字节数据
//maxFrameLength,设定一下这个协议对象的帧数据的最大的大小
//lengthFieldOffset,代表帧数据有多少自己的field,它的offset是哪一位
//从offset那个位置开始读,读lengthFieldLength个字节数量,就可以了
public PacketDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
Packet packet = Packet.builder()
.crc(frame.readInt())
.packetSize(frame.readInt())
.sessionId(frame.readLong())
.type(frame.readByte())
.level(frame.readByte())
.extendHeaders(decodeExtendHeaders(frame))
.body(decodeBody(frame))
.build();
return packet;
}
private Map<String, Object> decodeExtendHeaders(ByteBuf in) throws IOException {
int extendHeadersBytesSize = in.readInt();
byte[] extendHeadersBytes = new byte[extendHeadersBytesSize];
in.readBytes(extendHeadersBytes);
Map<String, Object> extendHeaders = (Map<String, Object>)SerializeUtils.deserialize(extendHeadersBytes, Map.class);
return extendHeaders;
}
private Object decodeBody(ByteBuf in) throws IOException, ClassNotFoundException {
int bodyBytesSize = in.readInt();
byte[] bodyBytes = new byte[bodyBytesSize];
in.readBytes(bodyBytes);
int bodyClazzBytesSize = in.readInt();
byte[] bodyClazzBytes = new byte[bodyClazzBytesSize];
in.readBytes(bodyClazzBytes);
String bodyClazzString = new String(bodyClazzBytes);
Class bodyClazz = Class.forName(bodyClazzString);
Object body = SerializeUtils.deserialize(bodyBytes, bodyClazz);
return body;
}
}