Netty基础—8.Netty实现私有协议栈二
大纲
1.私有协议介绍
2.私有协议的通信模型
3.私有协议栈的消息定义
4.私有协议栈链路的建立
5.私有协议栈链路的关闭
6.私有协议栈的心跳机制
7.私有协议栈的重连机制
8.私有协议栈的重复登录保护
9.私有协议栈核心的ChannelHandler
10.私有协议栈的客户端和服务端
11.私有协议栈的Packet数据包与编解码
12.私有协议栈的会话ID处理器
13.私有协议栈的握手处理器
14.私有协议栈的链路保活处理器
12.私有协议栈的会话ID处理器
客户端在通道激活时会由会话生成器生成一个会话ID,并利用channelId存放在会话缓存里。服务端在读取数据包时则先尝试根据channelId去会话缓存里获取会话ID,获取不到再从Packet数据包里取出来然后进行缓存。
//负责生成以及传递会话ID处理器handler
public class SessionIdHandler extends ChannelInboundHandlerAdapter {
private boolean needGenerate;
public SessionIdHandler(boolean needGenerate) {
this.needGenerate = needGenerate;
}
//客户端逻辑,通道激活时的处理
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (needGenerate) {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionIdGenerator.generate();
SessionManager sessionManager = SessionManager.getInstance();
sessionManager.putSessionId(channelId, sessionId);
}
//触发往后handler的channelActive
ctx.fireChannelActive();
}
//服务端逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
SessionManager sessionManager = SessionManager.getInstance();
String channelId = ctx.channel().id().asLongText();
Long sessionId = sessionManager.getSessionId(channelId);
if (sessionId == null) {
sessionId = packet.getHeader().getSessionId();
sessionManager.putSessionId(channelId, sessionId);
}
//触发往后handler的channelRead
ctx.fireChannelRead(msg);
}
}
//连接会话管理
public class SessionManager {
private SessionManager() {
}
private static class Singleton {
static SessionManager instance = new SessionManager();
}
public static SessionManager getInstance() {
return Singleton.instance;
}
//用来记录channelId和sessionId的map
private Map<String, Long> sessionIds = new ConcurrentHashMap<String, Long>();
//用来记录address和session的map,可判断请求是否重复
private Map<String, Session> sessions = new ConcurrentHashMap<String, Session>();
public void putSessionId(String channelId, Long sessionId) {
sessionIds.put(channelId, sessionId);
}
public long getSessionId(String channelId) {
return sessionIds.get(channelId);
}
public long getSessionId(ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
return sessionIds.get(channelId);
}
public void putSession(String remoteAddress, Session session) {
sessions.put(remoteAddress, session);
}
public Session getSession(String remoteAddress) {
return sessions.get(remoteAddress);
}
}
//会话id生成组件
public class SessionIdGenerator {
public static long generate() {
String uuid = UUID.randomUUID().toString();
return uuid.hashCode();
}
}
//连接会话
public class Session {
private String remoteAddress;
public Session(String remoteAddress) {
this.remoteAddress = remoteAddress;
}
public String getRemoteAddress() {
return remoteAddress;
}
public void setRemoteAddress(String remoteAddress) {
this.remoteAddress = remoteAddress;
}
}
13.私有协议栈的握手处理器
(1)握手处理器说明
(2)发起握手请求与激活通道
(3)握手请求的重复会话判断与响应
(4)握手请求IP是否在白名单的判断
(5)握手请求的响应以及非法连接问题
(1)握手处理器说明
当连接刚建立通道刚被激活时,客户端需要发起握手请求,服务端则需要启动一个延时线程检查握手是否超时,比如通道激活1分钟后还没有会话ID。
当读取Packet数据包时,需要判断请求是握手请求还是握手响应。客户端处理握手响应,服务端处理握手请求。
服务端处理握手请求时,还要根据白名单IP看是否为非法请求,并根据会话缓存避免重复握手。
所以握手处理器主要包括三个功能:白名单IP处理 + 握手超时处理 + 重复握手处理。
(2)发起握手请求与激活通道
首先在连接刚建立时客户端通过channelActive()方法发起握手请求。
//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);
//当连接刚建立时,客户端需要发送握手请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionManager.getInstance().getSessionId(channelId);
Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
ctx.writeAndFlush(handshakeRequestPacket);
//触发往后handler的channelActive
ctx.fireChannelActive();
}
//创建握手请求数据包
private Packet createHandshakeRequestPacket(long sessionId) throws IOException {
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.HandshakeRequest.value())
.level(PacketLevel.DEFAULT.value())
.body(new HandshakeRequest())
.build();
return packet;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
...
}
...
}
//握手请求
public class HandshakeRequest implements Serializable {
private String requestId = RequestIdGenerator.generate();
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
//请求ID生成组件
public class RequestIdGenerator {
public static String generate() {
return UUID.randomUUID().toString().replace("-", "");
}
}
然后服务端收到握手请求后通过channelRead()方法进行处理,客户端收到握手响应也是通过channelRead()方法进行处理。
//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);
//当连接刚建立时,客户端需要发送握手请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionManager.getInstance().getSessionId(channelId);
Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
ctx.writeAndFlush(handshakeRequestPacket);
ctx.fireChannelActive();
}
//创建握手请求数据包
private Packet createHandshakeRequestPacket(long sessionId) throws IOException {
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.HandshakeRequest.value())
.level(PacketLevel.DEFAULT.value())
.body(new HandshakeRequest())
.build();
return packet;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
//对不同请求的处理
if (isHandshakeRequest(packet)) {
handleHandshakeRequest(ctx, packet);
} else if (isHandshakeResponse(packet)) {
handleHandshakeResponse(ctx, packet);
} else {
ctx.fireChannelRead(msg);
}
}
//是否是握手请求
private boolean isHandshakeRequest(Packet packet) {
return packet.getHeader().getType() == PacketType.HandshakeRequest.value();
}
//是否是握手响应
private boolean isHandshakeResponse(Packet packet) {
return packet.getHeader().getType() == PacketType.HandshakeResponse.value();
}
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
...
}
//处理握手响应
private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {
...
}
}
public enum PacketType {
HandshakeRequest(1),
HandshakeResponse(2),
KeepAlivePing(3),
KeepAlivePong(4);
byte value;
PacketType(int value) {
this.value = (byte) value;
}
public byte value() {
return value;
}
}
(3)握手请求的重复会话判断与响应
//握手响应
public class HandshakeResponse implements Serializable {
public static final String SESSION_EXISTED_ERROR_MESSAGE = "Session Existed.";
public static final String NOT_IN_WHITE_LIST_ERROR_MESSAGE = "IP is not in white list.";
private String requestId;
private boolean success = true;
private String errorMessage;
//构造函数私有化不给外部进行new
private HandshakeResponse() {
}
public static HandshakeResponse success(String requestId) {
HandshakeResponse handshakeResponse = new HandshakeResponse();
handshakeResponse.setRequestId(requestId);
return handshakeResponse;
}
public static HandshakeResponse error(String requestId, String errorMessage) {
HandshakeResponse handshakeResponse = new HandshakeResponse();
handshakeResponse.setRequestId(requestId);
handshakeResponse.setSuccess(false);
handshakeResponse.setErrorMessage(errorMessage);
return handshakeResponse;
}
public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
}
握手请求的重复会话判断处理:
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (isHandshakeRequest(packet)) {
handleHandshakeRequest(ctx, packet);
} else if(isHandshakeResponse(packet)) {
handleHandshakeResponse(ctx, packet);
} else {
ctx.fireChannelRead(msg);
}
}
//是否是握手请求
private boolean isHandshakeRequest(Packet packet) {
return packet.getHeader().getType() == PacketType.HandshakeRequest.value();
}
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
Packet handshakeResponsePacket = null;
//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
if (existSession(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
}
ctx.writeAndFlush(handshakeResponsePacket);
}
//判断当前连接是否已经存在一个session了
private boolean existSession(ChannelHandlerContext ctx) {
String remoteAddress = ctx.channel().remoteAddress().toString();
SessionManager sessionManager = SessionManager.getInstance();
Session session = sessionManager.getSession(remoteAddress);
return session != null;
}
//创建握手应答Packet对象
private Packet createHandshakeResponsePacket(ChannelHandlerContext ctx, Packet handshakeRequestPacket, boolean success, String errorMessage) throws IOException {
HandshakeRequest handshakeRequest = (HandshakeRequest) handshakeRequestPacket.getBody();
HandshakeResponse handshakeResponse = success ? HandshakeResponse.success(handshakeRequest.getRequestId()) :
HandshakeResponse.error(handshakeRequest.getRequestId(), errorMessage);
Packet packet = Packet.builder()
.sessionId(handshakeRequestPacket.getHeader().getSessionId())
.type(PacketType.HandshakeResponse.value())
.level(PacketLevel.DEFAULT.value())
.body(handshakeResponse)
.build();
return packet;
}
...
}
(4)握手请求IP是否在白名单的判断
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
Packet handshakeResponsePacket = null;
//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
if (existSession(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
}
//如果发送握手请求的机器IP,不在白名单列表里,则为非法请求
else if(!inWhiteList(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);
}
ctx.writeAndFlush(handshakeResponsePacket);
}
//判断发送握手请求过来的机器IP地址,是否在白名单里
private boolean inWhiteList(ChannelHandlerContext ctx) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = socketAddress.getAddress().getHostAddress();
WhiteListManager whiteListManager = WhiteListManager.getInstance();
boolean inWhiteList = whiteListManager.inWhiteList(ip);
return inWhiteList;
}
...
}
public class WhiteListManager {
private WhiteListManager() {
whiteList.add("125.33.200.123");
}
private static class Singleton {
static WhiteListManager instance = new WhiteListManager();
}
public static WhiteListManager getInstance() {
return Singleton.instance;
}
private List<String> whiteList = new CopyOnWriteArrayList<String>();
public boolean inWhiteList(String ip) {
return whiteList.contains(ip);
}
}
(5)握手请求的响应以及非法连接问题
握手请求的应答处理:
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (isHandshakeRequest(packet)) {
handleHandshakeRequest(ctx, packet);
} else if(isHandshakeResponse(packet)) {
handleHandshakeResponse(ctx, packet);
} else {
ctx.fireChannelRead(msg);
}
}
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
Packet handshakeResponsePacket = null;
//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
if (existSession(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
}
//如果发送握手请求的机器IP,不在白名单列表里,则为非法请求
else if(!inWhiteList(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);
}
//当前连接不存在重复session,同时握手请求ip地址在白名单里
else {
initSession(ctx);
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, true, null);
}
ctx.writeAndFlush(handshakeResponsePacket);
}
private void initSession(ChannelHandlerContext ctx) {
String remoteAddress = ctx.channel().remoteAddress().toString();
SessionManager sessionManager = SessionManager.getInstance();
sessionManager.putSession(remoteAddress, new Session(remoteAddress));
}
//处理握手响应
private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {
HandshakeResponse handshakeResponse = (HandshakeResponse) packet.getBody();
//如果是握手成功了
if (handshakeResponse.isSuccess()) {
logger.info("handshake success.");
}
//如果是握手失败了
else {
logger.error(handshakeResponse.getErrorMessage());
ctx.close();
}
}
...
}
如果客户端非法跟服务端建立一个Netty物理连接后,却一直不发送握手请求,这会导致服务端的连接资源被非法占用。为了解决这个问题,需要进行握手超时检查。无握手、连接资源被非法侵占问题可以通过延时线程解决。
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
private int mode;
public HandshakeHandler(int mode) {
this.mode = mode;
}
//当连接刚被建立时,需要发送握手请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (mode == NettyMode.CLIENT.value()) {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionManager.getInstance().getSessionId(channelId);
Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
ctx.writeAndFlush(handshakeRequestPacket);
} else if(mode == NettyMode.SERVER.value()) {
//检查是否在指定时间范围内把握手请求发送过来
new HandshakeRequestTimeoutThread(ctx).start();
}
ctx.fireChannelActive();
}
//握手请求超时没收到检查的线程
private class HandshakeRequestTimeoutThread extends Thread {
private static final long HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD = 1 * 60 * 1000;
private ChannelHandlerContext ctx;
public HandshakeRequestTimeoutThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
//休眠1分钟,1分钟后再继续(1分钟后线程才能被唤醒, await则可以随时被唤醒)
Thread.sleep(HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD);
} catch (InterruptedException e) {
logger.error("HandshakeRequestTimeoutThread interrupted exception.");
}
if (!existSession(ctx)) {
logger.error("Client did not send handshake request in 1 minute.");
ctx.close();
}
}
}
...
}
public enum NettyMode {
CLIENT(1),
SERVER(2);
int value;
Mode(int value) {
this.value = value;
}
int value() {
return value;
}
}
14.私有协议栈的链路保活处理器
(1)链路保活处理器说明
(2)链路保活处理器框架搭建
(3)定时检测长时间未通信的连接
(4)链路保活探测数据包封装与发送
(5)链路保活探测数据包的处理与响应
(6)链路保活探测包发送失败的重试
(1)链路保活处理器说明
当连接刚建立通道刚被激活时,客户端和服务端各自启动一个链路保活的检查线程。该线程会每隔10分钟做一次保活检查,具体的检查判断如下:
一.如果当前时间距离上一次收到数据包已超1小时,则启动链路保活探测,即向对方发送Ping消息
二.发送Ping消息时会记录Ping消息次数,但只要收到Pong消息或业务消息,就要清空该记录
三.只要记录的Ping消息次数超过3次,就关闭连接
(2)链路保活处理器框架搭建
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private long lastPacketTimestamp = -1;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
}
ctx.fireChannelRead(msg);
}
}
//探测Ping包
public class KeepAlivePing implements Serializable {
private String requestId;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
//探测Pong包
public class KeepAlivePong implements Serializable {
private String requestId;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
(3)定时检测长时间未通信的连接
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
//每隔10分钟做一次保活检查
private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
//1小时都没有通信就开启链路保活探测
private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
private long lastPacketTimestamp = -1;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
private int keepAlivePingRetryTimes = 0;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
for(;;) {
//每隔10分钟做一次链路保活检查
try {
sleep(KEEP_ALIVE_CHECK_INTERNAL);
} catch (InterruptedException e) {
logger.error("Keep alive thread interrupted exception.");
}
//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
long now = System.currentTimeMillis();
if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
//TODO
}
}
}
private Packet createKeepAlivePingPacket() throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePing.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePing())
.build();
return packet;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
}
ctx.fireChannelRead(msg);
}
}
(4)链路保活探测数据包封装与发送
启动链路保活探测,发送保活探测包:
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
//每隔10分钟做一次保活检查
private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
//1个小时都没有通信就开启链路保活探测
private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
private long lastPacketTimestamp = -1;
//存放对于该长连接已经发送的保活探测包请求
//map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
private int keepAlivePingRetryTimes = 0;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
for(;;) {
//每隔10分钟做一次链路保活检查
try {
sleep(KEEP_ALIVE_CHECK_INTERNAL);
} catch (InterruptedException e) {
logger.error("Keep alive thread interrupted exception.");
}
//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
long now = System.currentTimeMillis();
if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
try {
Packet keepAlivePingPacket = createKeepAlivePingPacket();
ctx.writeAndFlush(keepAlivePingPacket);
KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();
keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);
} catch (Exception e) {
logger.error("keep alive ping packet serialization exception.");
}
}
}
}
private Packet createKeepAlivePingPacket() throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePing.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePing())
.build();
return packet;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
}
ctx.fireChannelRead(msg);
}
}
(5)链路保活探测数据包的处理与响应
收到保活探测包之后的处理:
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
//每隔10分钟做一次保活检查
private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
//1个小时都没有通信就开启链路保活探测
private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
private long lastPacketTimestamp = -1;
//存放对于该长连接已经发送的保活探测包请求
//map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
...
//收到保活探测包之后的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
keepAlivePingPackets.clear();
}
if (isKeepAlivePingPacket(packet)) {
//收到的是保活探测ping包就构建pong包
Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);
ctx.writeAndFlush(keepAlivePongPacket);
} else if(isKeepAlivePongPacket(packet)) {
//收到的是保活探测pong包就代表成功
KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();
keepAlivePingPackets.remove(keepAlivePong.getRequestId());
logger.info("Keep alive ping pong success.");
}
ctx.fireChannelRead(msg);
}
private boolean isKeepAlivePingPacket(Packet packet) {
return packet.getHeader().getType() == PacketType.KeepAlivePing.value();
}
private boolean isKeepAlivePongPacket(Packet packet) {
return packet.getHeader().getType() == PacketType.KeepAlivePong.value();
}
private Packet createKeepAlivePongPacket(ChannelHandlerContext ctx, Packet keepAliveRequestPacket) throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
KeepAlivePing keepAlivePing = (KeepAlivePing) keepAliveRequestPacket.getBody();
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePong.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePong(keepAlivePing.getRequestId()))
.build();
return packet;
}
}
(6)链路保活探测包发送失败的重试
如果10分钟了还没收到第一次发的Ping探测包的Pong响应,那么就进行重试,且最多重试3次。连续成功发送了3次Ping,结果一直没有Pong响应,则关闭连接。
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
...
private static final int KEEP_ALIVE_PING_RETRY_TIMES = 3;
private long lastPacketTimestamp = -1;
//存放对于该长连接已经发送的保活探测包请求
//map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
private int keepAlivePingRetryTimes = 0;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
for(;;) {
//每隔10分钟做一次链路保活检查
try {
sleep(KEEP_ALIVE_CHECK_INTERNAL);
} catch (InterruptedException e) {
logger.error("Keep alive thread interrupted exception.");
}
//每隔10分钟检查一下最近一次发送的keep alive ping是否已经收到pong了
if (keepAlivePingPackets.size() > 0) {
//发送重试
if (keepAlivePingPackets.size() < KEEP_ALIVE_PING_RETRY_TIMES) {
if (!sendKeepAlivePingPacketWithRetry(ctx)) {
ctx.close();
}
}
//连续发送成功了3次ping,结果一直没有pong回来,此时也是关闭物理连接
if (keepAlivePingPackets.size() >= KEEP_ALIVE_PING_RETRY_TIMES) {
ctx.close();
}
}
//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
long now = System.currentTimeMillis();
if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
//如果连续重试3次都发送不成功一个探测包,此时直接关闭物理连接
if (!sendKeepAlivePingPacketWithRetry(ctx)) {
ctx.close();
}
}
}
}
//10分钟内如果还没收到第一次发的ping探测包的pong响应,那么就进行重试,且最多重试3次
private boolean sendKeepAlivePingPacketWithRetry(ChannelHandlerContext ctx) {
boolean result = false;
int retryTimes = 0;
while(retryTimes < KEEP_ALIVE_PING_RETRY_TIMES) {
try {
sendKeepAlivePingPacket(ctx);
result = true;
break;
} catch (Exception e) {
logger.error("send keep alive ping packet exception.");
retryTimes++;
}
}
return result;
}
private void sendKeepAlivePingPacket(ChannelHandlerContext ctx) throws IOException {
Packet keepAlivePingPacket = createKeepAlivePingPacket();
ctx.writeAndFlush(keepAlivePingPacket);
KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();
keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);
}
private Packet createKeepAlivePingPacket() throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePing.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePing())
.build();
return packet;
}
}
...
}
当有数据包发送过来时,那么就可以清空记录的Ping探测包了。
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
...
//收到保活探测包之后的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
//有包发送过来的时候,不管是什么样的包,ping探测包都可以清空了
lastPacketTimestamp = System.currentTimeMillis();
keepAlivePingPackets.clear();
}
if (isKeepAlivePingPacket(packet)) {
//收到的是保活探测ping包就构建pong包
Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);
ctx.writeAndFlush(keepAlivePongPacket);
} else if(isKeepAlivePongPacket(packet)) {
//收到的是保活探测pong包就代表成功
KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();
keepAlivePingPackets.remove(keepAlivePong.getRequestId());
logger.info("Keep alive ping pong success.");
}
ctx.fireChannelRead(msg);
}
...
}