【自定义网络协议】Java基于Vert.x的自定义TCP协议实现
在现代的软件开发中,TCP协议广泛应用于需要高效、低延迟数据传输的场景。相较于HTTP协议,TCP提供了更底层的控制和更高的性能,适用于嵌入式设备、实时数据传输等应用。Vert.x是一个基于事件驱动、异步和多线程的高效开发框架,特别适合用于构建TCP服务。本文将介绍如何使用Vert.x在Java中实现自定义TCP协议。
一、环境准备
创建Maven项目:首先,创建一个Maven项目,并添加Vert.x核心依赖。
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.2.7</version>
</dependency>
</dependencies>
二、定义协议格式
在自定义TCP协议时,需要定义协议的格式。这里采用一个简单的二进制格式:
协议结构实体类
/**
* 协议消息结构
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {
/**
* 消息头
*/
private Header header;
/**
* 消息体(请求或响应对象)
*/
private T body;
/**
* 协议消息头
*/
@Data
public static class Header {
/**
* 魔数,保证安全性
*/
private byte magic;
/**
* 版本号
*/
private byte version;
/**
* 序列化器
*/
private byte serializer;
/**
* 消息类型(请求 / 响应)
*/
private byte type;
/**
* 状态
*/
private byte status;
/**
* 请求 id
*/
private long requestId;
/**
* 消息体长度
*/
private int bodyLength;
}
}
三、实现编码器和解码器
vert.x 的TCP 服务器收发的消息是 Buffer 类型,不能直接写入一个对象。因此,我们需要编码器和解码器,将 Java 的消息对象和 Buffer进行相互转换。
/**
* 协议消息解码器
*/
public class ProtocolMessageDecoder {
/**
* 解码
*
* @param buffer
* @return
* @throws IOException
*/
public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
// 分别从指定位置读出 Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
byte magic = buffer.getByte(0);
// 校验魔数
if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
throw new RuntimeException("消息 magic 非法");
}
header.setMagic(magic);
header.setVersion(buffer.getByte(1));
header.setSerializer(buffer.getByte(2));
header.setType(buffer.getByte(3));
header.setStatus(buffer.getByte(4));
header.setRequestId(buffer.getLong(5));
header.setBodyLength(buffer.getInt(13));
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
// 解析消息体
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化消息的协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
if (messageTypeEnum == null) {
throw new RuntimeException("序列化消息的类型不存在");
}
switch (messageTypeEnum) {
case REQUEST:
RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
return new ProtocolMessage<>(header, request);
case RESPONSE:
RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
return new ProtocolMessage<>(header, response);
case HEART_BEAT:
case OTHERS:
default:
throw new RuntimeException("暂不支持该消息类型");
}
}
}
编码器
public class ProtocolMessageEncoder {
/**
* 编码
*
* @param protocolMessage
* @return
* @throws IOException
*/
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
if (protocolMessage == null || protocolMessage.getHeader() == null) {
return Buffer.buffer();
}
ProtocolMessage.Header header = protocolMessage.getHeader();
// 依次向缓冲区写入字节
Buffer buffer = Buffer.buffer();
buffer.appendByte(header.getMagic());
buffer.appendByte(header.getVersion());
buffer.appendByte(header.getSerializer());
buffer.appendByte(header.getType());
buffer.appendByte(header.getStatus());
buffer.appendLong(header.getRequestId());
// 获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
// 写入 body 长度和数据
buffer.appendInt(bodyBytes.length);
buffer.appendBytes(bodyBytes);
return buffer;
}
}
四、实现TCP服务器
创建一个服务处理类,处理请求和响应
public class TcpServerHandler implements Handler<NetSocket> {
@Override
public void handle(NetSocket netSocket) {
// 处理连接
netSocket.handler(buffer -> {
// 接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();
// 处理请求
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 发送响应,编码
ProtocolMessage.Header header = protocolMessage.getHeader();
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
netSocket.write(encode);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
});
}
}
实现TCP服务器
public class VertxTcpServer implements HttpServer {
private byte[] handleRequest(byte[] requestData) {
// 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
return "Hello, client!".getBytes();
}
@Override
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 创建 TCP 服务器
NetServer server = vertx.createNetServer();
// 处理请求
server.connectHandler(new TcpServerHandler());
// 启动 TCP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
System.out.println("TCP server started on port " + port);
} else {
System.err.println("Failed to start TCP server: " + result.cause());
}
});
}
public static void main(String[] args) {
new VertxTcpServer().doStart(8888);
}
}
五、编写一个TCP客户端用于测试服务器。
public class VertxTcpClient {
private final static String HOST = "127.0.0.1";
private final static Integer PORT = 8888;
private final static byte SETSERIALIZER = 0x1;
private final static byte VERSION = 0x1;
private final static byte MAGIC = 0x1;
private final static byte TYPE = 0x1;
public void start() {
try {
// 发送 TCP 请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
netClient.connect(PORT, HOST,
result -> {
if (result.succeeded()) {
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
// 发送数据
// 构造消息
ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(MAGIC);
header.setVersion(VERSION);
header.setSerializer((SETSERIALIZER);
header.setType(TYPE);
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(null);
// 编码请求
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
// 接收响应
socket.handler(buffer -> {
try {
ProtocolMessage<Response> ResponseProtocolMessage = (ProtocolMessage<Response>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
});
} else {
System.err.println("Failed to connect to TCP server");
}
});
// 记得关闭连接
netClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new VertxTcpClient().start();
}
}
六、总结
本文介绍了如何使用Vert.x在Java中实现自定义TCP协议。通过定义协议格式、实现TCP服务器和客户端,我们可以构建高效、低延迟的数据传输系统。Vert.x的异步、事件驱动特性使得它在处理大量并发连接时表现优异,非常适合用于实时通信、物联网等场景。希望本文对你有所帮助,祝你编程愉快!