当前位置: 首页 > article >正文

Netty基础—8.Netty实现私有协议栈二

大纲

1.私有协议介绍

2.私有协议的通信模型

3.私有协议栈的消息定义

4.私有协议栈链路的建立

5.私有协议栈链路的关闭

6.私有协议栈的心跳机制

7.私有协议栈的重连机制

8.私有协议栈的重复登录保护

9.私有协议栈核心的ChannelHandler

10.私有协议栈的客户端和服务端

11.私有协议栈的Packet数据包与编解码

12.私有协议栈的会话ID处理器

13.私有协议栈的握手处理器

14.私有协议栈的链路保活处理器

12.私有协议栈的会话ID处理器

客户端在通道激活时会由会话生成器生成一个会话ID,并利用channelId存放在会话缓存里。服务端在读取数据包时则先尝试根据channelId去会话缓存里获取会话ID,获取不到再从Packet数据包里取出来然后进行缓存。

//负责生成以及传递会话ID处理器handler
public class SessionIdHandler extends ChannelInboundHandlerAdapter {
    private boolean needGenerate;
    public SessionIdHandler(boolean needGenerate) {
        this.needGenerate = needGenerate;
    }
    
    //客户端逻辑,通道激活时的处理
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (needGenerate) {
            String channelId = ctx.channel().id().asLongText();
            long sessionId = SessionIdGenerator.generate();

            SessionManager sessionManager = SessionManager.getInstance();
            sessionManager.putSessionId(channelId, sessionId);
        }
        //触发往后handler的channelActive
        ctx.fireChannelActive();
    }
    
    //服务端逻辑
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        SessionManager sessionManager = SessionManager.getInstance();
        String channelId = ctx.channel().id().asLongText();
        Long sessionId = sessionManager.getSessionId(channelId);
        if (sessionId == null) {
            sessionId = packet.getHeader().getSessionId();
            sessionManager.putSessionId(channelId, sessionId);
        }
        //触发往后handler的channelRead
        ctx.fireChannelRead(msg);
    }
}

//连接会话管理
public class SessionManager {
    private SessionManager() {
    }    
    private static class Singleton {
        static SessionManager instance = new SessionManager();
    }
    public static SessionManager getInstance() {
        return Singleton.instance;
    }
    
    //用来记录channelId和sessionId的map
    private Map<String, Long> sessionIds = new ConcurrentHashMap<String, Long>();
    //用来记录address和session的map,可判断请求是否重复
    private Map<String, Session> sessions = new ConcurrentHashMap<String, Session>();
    public void putSessionId(String channelId, Long sessionId) {
        sessionIds.put(channelId, sessionId);
    }
    public long getSessionId(String channelId) {
        return sessionIds.get(channelId);
    }
    public long getSessionId(ChannelHandlerContext ctx) {
        String channelId = ctx.channel().id().asLongText();
        return sessionIds.get(channelId);
    }
    public void putSession(String remoteAddress, Session session) {
        sessions.put(remoteAddress, session);
    }
    public Session getSession(String remoteAddress) {
        return sessions.get(remoteAddress);
    }
}

//会话id生成组件
public class SessionIdGenerator {
    public static long generate() {
        String uuid = UUID.randomUUID().toString();
        return uuid.hashCode();
    }
}

//连接会话
public class Session {
    private String remoteAddress;
    public Session(String remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    public String getRemoteAddress() {
        return remoteAddress;
    }
    public void setRemoteAddress(String remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
}

13.私有协议栈的握手处理器

(1)握手处理器说明

(2)发起握手请求与激活通道

(3)握手请求的重复会话判断与响应

(4)握手请求IP是否在白名单的判断

(5)握手请求的响应以及非法连接问题

(1)握手处理器说明

当连接刚建立通道刚被激活时,客户端需要发起握手请求,服务端则需要启动一个延时线程检查握手是否超时,比如通道激活1分钟后还没有会话ID。

当读取Packet数据包时,需要判断请求是握手请求还是握手响应。客户端处理握手响应,服务端处理握手请求。

服务端处理握手请求时,还要根据白名单IP看是否为非法请求,并根据会话缓存避免重复握手。

所以握手处理器主要包括三个功能:白名单IP处理 + 握手超时处理 + 重复握手处理。

(2)发起握手请求与激活通道

首先在连接刚建立时客户端通过channelActive()方法发起握手请求。

//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);
    
    //当连接刚建立时,客户端需要发送握手请求
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String channelId = ctx.channel().id().asLongText();
        long sessionId = SessionManager.getInstance().getSessionId(channelId);
        Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
        ctx.writeAndFlush(handshakeRequestPacket);
        //触发往后handler的channelActive
        ctx.fireChannelActive();
    }
    
    //创建握手请求数据包
    private Packet createHandshakeRequestPacket(long sessionId) throws IOException {
        Packet packet = Packet.builder()
            .sessionId(sessionId)
            .type(PacketType.HandshakeRequest.value())
            .level(PacketLevel.DEFAULT.value())
            .body(new HandshakeRequest())
            .build();
        return packet;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ...
    }
    ...
}

//握手请求
public class HandshakeRequest implements Serializable {
    private String requestId = RequestIdGenerator.generate();
    public String getRequestId() {
        return requestId;
    }
    
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }
}

//请求ID生成组件
public class RequestIdGenerator {
    public static String generate() {
        return UUID.randomUUID().toString().replace("-", "");
    }
}

然后服务端收到握手请求后通过channelRead()方法进行处理,客户端收到握手响应也是通过channelRead()方法进行处理。

//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);
    
    //当连接刚建立时,客户端需要发送握手请求
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String channelId = ctx.channel().id().asLongText();
        long sessionId = SessionManager.getInstance().getSessionId(channelId);
        Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
        ctx.writeAndFlush(handshakeRequestPacket);
        ctx.fireChannelActive();
    }
    
    //创建握手请求数据包
    private Packet createHandshakeRequestPacket(long sessionId) throws IOException {
        Packet packet = Packet.builder()
            .sessionId(sessionId)
            .type(PacketType.HandshakeRequest.value())
            .level(PacketLevel.DEFAULT.value())
            .body(new HandshakeRequest())
            .build();
        return packet;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        //对不同请求的处理
        if (isHandshakeRequest(packet)) {
            handleHandshakeRequest(ctx, packet);
        } else if (isHandshakeResponse(packet)) {
            handleHandshakeResponse(ctx, packet);
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    //是否是握手请求
    private boolean isHandshakeRequest(Packet packet) {
        return packet.getHeader().getType() == PacketType.HandshakeRequest.value();
    }
    
    //是否是握手响应
    private boolean isHandshakeResponse(Packet packet) {
        return packet.getHeader().getType() == PacketType.HandshakeResponse.value();
    }
    
    //处理握手请求
    private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
        ...
    }
    
    //处理握手响应
    private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {
        ...
    }
}

public enum PacketType {
    HandshakeRequest(1),
    HandshakeResponse(2),
    KeepAlivePing(3),
    KeepAlivePong(4);
    byte value;
    
    PacketType(int value) {
        this.value = (byte) value;
    }
    
    public byte value() {
        return value;
    }
}

(3)握手请求的重复会话判断与响应

//握手响应
public class HandshakeResponse implements Serializable {
    public static final String SESSION_EXISTED_ERROR_MESSAGE = "Session Existed.";
    public static final String NOT_IN_WHITE_LIST_ERROR_MESSAGE = "IP is not in white list.";
    private String requestId;
    private boolean success = true;
    private String errorMessage;

    //构造函数私有化不给外部进行new
    private HandshakeResponse() {

    }

    public static HandshakeResponse success(String requestId) {
        HandshakeResponse handshakeResponse = new HandshakeResponse();
        handshakeResponse.setRequestId(requestId);
        return handshakeResponse;
    }

    public static HandshakeResponse error(String requestId, String errorMessage) {
        HandshakeResponse handshakeResponse = new HandshakeResponse();
        handshakeResponse.setRequestId(requestId);
        handshakeResponse.setSuccess(false);
        handshakeResponse.setErrorMessage(errorMessage);
        return handshakeResponse;
    }

    public String getRequestId() { return requestId; }
    public void setRequestId(String requestId) { this.requestId = requestId; }
    public boolean isSuccess() { return success; }
    public void setSuccess(boolean success) { this.success = success; }
    public String getErrorMessage() { return errorMessage; }
    public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
}

握手请求的重复会话判断处理:

public class HandshakeHandler extends ChannelInboundHandlerAdapter {
    ...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (isHandshakeRequest(packet)) {
            handleHandshakeRequest(ctx, packet);
        } else if(isHandshakeResponse(packet)) {
            handleHandshakeResponse(ctx, packet);
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    //是否是握手请求
    private boolean isHandshakeRequest(Packet packet) {
        return packet.getHeader().getType() == PacketType.HandshakeRequest.value();
    }
    
    //处理握手请求
    private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
        Packet handshakeResponsePacket = null;
        //如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
        if (existSession(ctx)) {
            handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
        }
        ctx.writeAndFlush(handshakeResponsePacket);
    }
    
    //判断当前连接是否已经存在一个session了
    private boolean existSession(ChannelHandlerContext ctx) {
        String remoteAddress = ctx.channel().remoteAddress().toString();
        SessionManager sessionManager = SessionManager.getInstance();
        Session session = sessionManager.getSession(remoteAddress);
        return session != null;
    }
    
    //创建握手应答Packet对象
    private Packet createHandshakeResponsePacket(ChannelHandlerContext ctx, Packet handshakeRequestPacket, boolean success, String errorMessage) throws IOException {
        HandshakeRequest handshakeRequest = (HandshakeRequest) handshakeRequestPacket.getBody();
        HandshakeResponse handshakeResponse = success ? HandshakeResponse.success(handshakeRequest.getRequestId()) : 
        HandshakeResponse.error(handshakeRequest.getRequestId(), errorMessage);
        Packet packet = Packet.builder()
            .sessionId(handshakeRequestPacket.getHeader().getSessionId())
            .type(PacketType.HandshakeResponse.value())
            .level(PacketLevel.DEFAULT.value())
            .body(handshakeResponse)
            .build();
        return packet;
    }
    ...
}

(4)握手请求IP是否在白名单的判断

public class HandshakeHandler extends ChannelInboundHandlerAdapter {
    ...
    //处理握手请求
    private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
        Packet handshakeResponsePacket = null;
        //如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
        if (existSession(ctx)) {
            handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
        }
        //如果发送握手请求的机器IP,不在白名单列表里,则为非法请求
        else if(!inWhiteList(ctx)) {
            handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);
        }
        ctx.writeAndFlush(handshakeResponsePacket);
    }
    
    //判断发送握手请求过来的机器IP地址,是否在白名单里
    private boolean inWhiteList(ChannelHandlerContext ctx) {
        InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String ip = socketAddress.getAddress().getHostAddress();
        WhiteListManager whiteListManager = WhiteListManager.getInstance();
        boolean inWhiteList = whiteListManager.inWhiteList(ip);
        return inWhiteList;
    }
    ...
}

public class WhiteListManager {
    private WhiteListManager() {
        whiteList.add("125.33.200.123");
    }
    private static class Singleton {
        static WhiteListManager instance = new WhiteListManager();
    }
    public static WhiteListManager getInstance() {
        return Singleton.instance;
    }
    private List<String> whiteList = new CopyOnWriteArrayList<String>();
    public boolean inWhiteList(String ip) {
        return whiteList.contains(ip);
    }
}

(5)握手请求的响应以及非法连接问题

握手请求的应答处理:

public class HandshakeHandler extends ChannelInboundHandlerAdapter {
    ...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (isHandshakeRequest(packet)) {
            handleHandshakeRequest(ctx, packet);
        } else if(isHandshakeResponse(packet)) {
            handleHandshakeResponse(ctx, packet);
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    //处理握手请求
    private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
        Packet handshakeResponsePacket = null;
        //如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
        if (existSession(ctx)) {
            handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
        }
        //如果发送握手请求的机器IP,不在白名单列表里,则为非法请求
        else if(!inWhiteList(ctx)) {
            handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);
        }
        //当前连接不存在重复session,同时握手请求ip地址在白名单里
        else {
            initSession(ctx);
            handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, true, null);
        }
        ctx.writeAndFlush(handshakeResponsePacket);
    }
    
    private void initSession(ChannelHandlerContext ctx) {
        String remoteAddress = ctx.channel().remoteAddress().toString();
        SessionManager sessionManager = SessionManager.getInstance();
        sessionManager.putSession(remoteAddress, new Session(remoteAddress));
    }
    
    //处理握手响应
    private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {
        HandshakeResponse handshakeResponse = (HandshakeResponse) packet.getBody();
        //如果是握手成功了
        if (handshakeResponse.isSuccess()) {
            logger.info("handshake success.");
        }
        //如果是握手失败了
        else {
            logger.error(handshakeResponse.getErrorMessage());
            ctx.close();
        }
    }
    ...
}

如果客户端非法跟服务端建立一个Netty物理连接后,却一直不发送握手请求,这会导致服务端的连接资源被非法占用。为了解决这个问题,需要进行握手超时检查。无握手、连接资源被非法侵占问题可以通过延时线程解决。

public class HandshakeHandler extends ChannelInboundHandlerAdapter {
    ...
    private int mode;
    public HandshakeHandler(int mode) {
        this.mode = mode;
    }
    
    //当连接刚被建立时,需要发送握手请求
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (mode == NettyMode.CLIENT.value()) {
            String channelId = ctx.channel().id().asLongText();
            long sessionId = SessionManager.getInstance().getSessionId(channelId);
            Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
            ctx.writeAndFlush(handshakeRequestPacket);
        } else if(mode == NettyMode.SERVER.value()) {
            //检查是否在指定时间范围内把握手请求发送过来
            new HandshakeRequestTimeoutThread(ctx).start();
        }
        ctx.fireChannelActive();
    }
    
    //握手请求超时没收到检查的线程
    private class HandshakeRequestTimeoutThread extends Thread {
        private static final long HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD = 1 * 60 * 1000;
        private ChannelHandlerContext ctx;
        public HandshakeRequestTimeoutThread(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        
        @Override
        public void run() {
            try {
                //休眠1分钟,1分钟后再继续(1分钟后线程才能被唤醒, await则可以随时被唤醒)
                Thread.sleep(HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD);
            } catch (InterruptedException e) {
                logger.error("HandshakeRequestTimeoutThread interrupted exception.");
            }
            if (!existSession(ctx)) {
                logger.error("Client did not send handshake request in 1 minute.");
                ctx.close();
            }
        }
    }
    ...
}

public enum NettyMode {
    CLIENT(1),
    SERVER(2);
    int value;
    Mode(int value) {
        this.value = value;
    }
    int value() {
        return value;
    }
}

14.私有协议栈的链路保活处理器

(1)链路保活处理器说明

(2)链路保活处理器框架搭建

(3)定时检测长时间未通信的连接

(4)链路保活探测数据包封装与发送

(5)链路保活探测数据包的处理与响应

(6)链路保活探测包发送失败的重试

(1)链路保活处理器说明

当连接刚建立通道刚被激活时,客户端和服务端各自启动一个链路保活的检查线程。该线程会每隔10分钟做一次保活检查,具体的检查判断如下:

一.如果当前时间距离上一次收到数据包已超1小时,则启动链路保活探测,即向对方发送Ping消息

二.发送Ping消息时会记录Ping消息次数,但只要收到Pong消息或业务消息,就要清空该记录

三.只要记录的Ping消息次数超过3次,就关闭连接

(2)链路保活处理器框架搭建

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
    private long lastPacketTimestamp = -1;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new KeepAliveThread(ctx).start();
    }
    
    //通信链路保活检查线程
    private class KeepAliveThread extends Thread {
        private ChannelHandlerContext ctx;
        public KeepAliveThread(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        @Override
        public void run() {

        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (packet != null) {
            lastPacketTimestamp = System.currentTimeMillis();
        }
        ctx.fireChannelRead(msg);
    }
}

//探测Ping包
public class KeepAlivePing implements Serializable {
    private String requestId;
    public String getRequestId() {
        return requestId;
    }
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }
}

//探测Pong包
public class KeepAlivePong implements Serializable {
    private String requestId;
    public String getRequestId() {
        return requestId;
    }
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }
}

(3)定时检测长时间未通信的连接

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
    //每隔10分钟做一次保活检查
    private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
    //1小时都没有通信就开启链路保活探测
    private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
    private long lastPacketTimestamp = -1;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new KeepAliveThread(ctx).start();
    }
    
    //通信链路保活检查线程
    private class KeepAliveThread extends Thread {
        private ChannelHandlerContext ctx;
        private int keepAlivePingRetryTimes = 0;
        
        public KeepAliveThread(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        
        @Override
        public void run() {
            for(;;) {
                //每隔10分钟做一次链路保活检查
                try {
                    sleep(KEEP_ALIVE_CHECK_INTERNAL);
                } catch (InterruptedException e) {
                    logger.error("Keep alive thread interrupted exception.");
                }

                //当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
                long now = System.currentTimeMillis();
                if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
                    //TODO                    
                }
            }
        }

        private Packet createKeepAlivePingPacket() throws IOException {
            SessionManager sessionManager = SessionManager.getInstance();
            long sessionId = sessionManager.getSessionId(ctx);
            Packet packet = Packet.builder()
                .sessionId(sessionId)
                .type(PacketType.KeepAlivePing.value())
                .level(PacketLevel.DEFAULT.value())
                .body(new KeepAlivePing())
                .build();
            return packet;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (packet != null) {
            lastPacketTimestamp = System.currentTimeMillis();
        }
        ctx.fireChannelRead(msg);
    }
}

(4)链路保活探测数据包封装与发送

启动链路保活探测,发送保活探测包:

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
    //每隔10分钟做一次保活检查
    private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
    //1个小时都没有通信就开启链路保活探测
    private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;

    private long lastPacketTimestamp = -1;
    //存放对于该长连接已经发送的保活探测包请求
    //map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
    private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new KeepAliveThread(ctx).start();
    }
    
    //通信链路保活检查线程
    private class KeepAliveThread extends Thread {
        private ChannelHandlerContext ctx;
        private int keepAlivePingRetryTimes = 0;
        
        public KeepAliveThread(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        
        @Override
        public void run() {
            for(;;) {
                //每隔10分钟做一次链路保活检查
                try {
                    sleep(KEEP_ALIVE_CHECK_INTERNAL);
                } catch (InterruptedException e) {
                    logger.error("Keep alive thread interrupted exception.");
                }

                //当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
                long now = System.currentTimeMillis();
                if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
                    try {
                        Packet keepAlivePingPacket = createKeepAlivePingPacket();
                        ctx.writeAndFlush(keepAlivePingPacket);

                        KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();
                        keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);
                    } catch (Exception e) {
                        logger.error("keep alive ping packet serialization exception.");
                    }
                }
            }
        }

        private Packet createKeepAlivePingPacket() throws IOException {
            SessionManager sessionManager = SessionManager.getInstance();
            long sessionId = sessionManager.getSessionId(ctx);
            Packet packet = Packet.builder()
                .sessionId(sessionId)
                .type(PacketType.KeepAlivePing.value())
                .level(PacketLevel.DEFAULT.value())
                .body(new KeepAlivePing())
                .build();
            return packet;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (packet != null) {
            lastPacketTimestamp = System.currentTimeMillis();
        }
        ctx.fireChannelRead(msg);
    }
}

(5)链路保活探测数据包的处理与响应

收到保活探测包之后的处理:

//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
    //每隔10分钟做一次保活检查
    private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
    //1个小时都没有通信就开启链路保活探测
    private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;

    private long lastPacketTimestamp = -1;
    //存放对于该长连接已经发送的保活探测包请求
    //map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
    private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new KeepAliveThread(ctx).start();
    }
    ...
    //收到保活探测包之后的处理
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (packet != null) {
            lastPacketTimestamp = System.currentTimeMillis();
            keepAlivePingPackets.clear();
        }
        if (isKeepAlivePingPacket(packet)) {
            //收到的是保活探测ping包就构建pong包
            Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);
            ctx.writeAndFlush(keepAlivePongPacket);
        }  else if(isKeepAlivePongPacket(packet)) {
            //收到的是保活探测pong包就代表成功
            KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();
            keepAlivePingPackets.remove(keepAlivePong.getRequestId());
            logger.info("Keep alive ping pong success.");
        }
        ctx.fireChannelRead(msg);
    }
    private boolean isKeepAlivePingPacket(Packet packet) {
        return packet.getHeader().getType() == PacketType.KeepAlivePing.value();
    }
    private boolean isKeepAlivePongPacket(Packet packet) {
        return packet.getHeader().getType() == PacketType.KeepAlivePong.value();
    }
    private Packet createKeepAlivePongPacket(ChannelHandlerContext ctx, Packet keepAliveRequestPacket) throws IOException {
        SessionManager sessionManager = SessionManager.getInstance();
        long sessionId = sessionManager.getSessionId(ctx);
        KeepAlivePing keepAlivePing = (KeepAlivePing) keepAliveRequestPacket.getBody();
        Packet packet = Packet.builder()
            .sessionId(sessionId)
            .type(PacketType.KeepAlivePong.value())
            .level(PacketLevel.DEFAULT.value())
            .body(new KeepAlivePong(keepAlivePing.getRequestId()))
            .build();
        return packet;
    }
}

(6)链路保活探测包发送失败的重试

如果10分钟了还没收到第一次发的Ping探测包的Pong响应,那么就进行重试,且最多重试3次。连续成功发送了3次Ping,结果一直没有Pong响应,则关闭连接。

public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
    ...
    private static final int KEEP_ALIVE_PING_RETRY_TIMES = 3;
    private long lastPacketTimestamp = -1;
    //存放对于该长连接已经发送的保活探测包请求
    //map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
    private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new KeepAliveThread(ctx).start();
    }
    
    //通信链路保活检查线程
    private class KeepAliveThread extends Thread {
        private ChannelHandlerContext ctx;
        private int keepAlivePingRetryTimes = 0;
        
        public KeepAliveThread(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        
        @Override
        public void run() {
            for(;;) {
                //每隔10分钟做一次链路保活检查
                try {
                    sleep(KEEP_ALIVE_CHECK_INTERNAL);
                } catch (InterruptedException e) {
                    logger.error("Keep alive thread interrupted exception.");
                }
                //每隔10分钟检查一下最近一次发送的keep alive ping是否已经收到pong了
                if (keepAlivePingPackets.size() > 0) {
                    //发送重试
                    if (keepAlivePingPackets.size() < KEEP_ALIVE_PING_RETRY_TIMES) {
                        if (!sendKeepAlivePingPacketWithRetry(ctx)) {
                            ctx.close();
                        }
                    }
                    //连续发送成功了3次ping,结果一直没有pong回来,此时也是关闭物理连接
                    if (keepAlivePingPackets.size() >= KEEP_ALIVE_PING_RETRY_TIMES) {
                        ctx.close();
                    }
                }
                //当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
                long now = System.currentTimeMillis();
                if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
                    //如果连续重试3次都发送不成功一个探测包,此时直接关闭物理连接
                    if (!sendKeepAlivePingPacketWithRetry(ctx)) {
                        ctx.close();
                    }
                }
            }
        }
        
        //10分钟内如果还没收到第一次发的ping探测包的pong响应,那么就进行重试,且最多重试3次
        private boolean sendKeepAlivePingPacketWithRetry(ChannelHandlerContext ctx) {
            boolean result = false;
            int retryTimes = 0;
    
            while(retryTimes < KEEP_ALIVE_PING_RETRY_TIMES) {
                try {
                    sendKeepAlivePingPacket(ctx);
                    result = true;
                    break;
                } catch (Exception e) {
                    logger.error("send keep alive ping packet exception.");
                    retryTimes++;
                }
            }
            return result;
        }
        
        private void sendKeepAlivePingPacket(ChannelHandlerContext ctx) throws IOException {
            Packet keepAlivePingPacket = createKeepAlivePingPacket();
            ctx.writeAndFlush(keepAlivePingPacket);
            KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();
            keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);
        }
        
        private Packet createKeepAlivePingPacket() throws IOException {
            SessionManager sessionManager = SessionManager.getInstance();
            long sessionId = sessionManager.getSessionId(ctx);
            Packet packet = Packet.builder()
                .sessionId(sessionId)
                .type(PacketType.KeepAlivePing.value())
                .level(PacketLevel.DEFAULT.value())
                .body(new KeepAlivePing())
                .build();
            return packet;
        }
    }
    ...
}

当有数据包发送过来时,那么就可以清空记录的Ping探测包了。

public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
    ...
    //收到保活探测包之后的处理
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packet packet = (Packet) msg;
        if (packet != null) {
            //有包发送过来的时候,不管是什么样的包,ping探测包都可以清空了
            lastPacketTimestamp = System.currentTimeMillis();
            keepAlivePingPackets.clear();
        }
        if (isKeepAlivePingPacket(packet)) {
            //收到的是保活探测ping包就构建pong包
            Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);
            ctx.writeAndFlush(keepAlivePongPacket);
        }  else if(isKeepAlivePongPacket(packet)) {
            //收到的是保活探测pong包就代表成功
            KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();
            keepAlivePingPackets.remove(keepAlivePong.getRequestId());
            logger.info("Keep alive ping pong success.");
        }
        ctx.fireChannelRead(msg);
    }
    ...
}


http://www.kler.cn/a/590725.html

相关文章:

  • 激光slam学习笔记10---ubuntu2004部署运行fastlivo2踩坑记录
  • 【Ratis】ReferenceCountedObject接口的作用及参考意义
  • springboot多种生产打包方式教程
  • 【从零开始学习计算机】计算机网络(一)计算机网络分层结构
  • javaEE————文件IO(1)
  • MySQL使用pxc实现高可用
  • Day34 | 300. 最长递增子序列、674. 最长连续递增序列、718. 最长重复子数组、1143. 最长公共子序列
  • 卓越的用户体验需要智能内容
  • MiddleVR for Unity插件
  • Linux FILE文件操作1-文件指针、文件缓冲区(行缓冲、全缓冲、无缓冲)的验证
  • Java学习------抽象类和接口
  • 图解AUTOSAR_CP_WatchdogDriver
  • 什么是梯度方差和缩放因子
  • Effective C++ 剖析(条款1~9)
  • 前端面试:axios 是否可以取消请求?
  • jmeter分布式原理及实例
  • 流量分析实践
  • Elixir语言的开源贡献
  • 【NLP】2. TF-IDF(Log 形式)
  • 评估大语言模型挑战和方法-AAAI2025