当前位置: 首页 > article >正文

im-system学习

文章目录

    • LimServer
    • LimServer
    • snakeyaml
      • 依赖
      • 使用
      • 配置类
      • 配置文件
    • 私有协议解码
      • MessageDecoder
      • ByteBufToMessageUtils

这个很全: IM即时通讯系统[SpringBoot+Netty]——梳理(总)

IO线程模型

Redis 分布式客户端 Redisson 分布式锁快速入门

LimServer

public class LimServer {

    private final static Logger logger = LoggerFactory.getLogger(LimServer.class);
    BootstrapConfig.TcpConfig config;
    EventLoopGroup mainGroup;
    EventLoopGroup subGroup;
    ServerBootstrap server;

    public LimServer(BootstrapConfig.TcpConfig config) {
        this.config = config;
        mainGroup = new NioEventLoopGroup(config.getBossThreadSize());
        subGroup = new NioEventLoopGroup(config.getWorkThreadSize());
        server = new ServerBootstrap();
        server.group(mainGroup,subGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
                .option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MessageDecoder());
                        ch.pipeline().addLast(new MessageEncoder());
//                        ch.pipeline().addLast(new IdleStateHandler(
//                                0, 0,
//                                10));
                        ch.pipeline().addLast(new HeartBeatHandler(config.getHeartBeatTime()));
                        ch.pipeline().addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));
                    }
                });
    }

    public void start(){
        this.server.bind(this.config.getTcpPort());
    }


}

LimServer

public class LimWebSocketServer {

    private final static Logger logger = LoggerFactory.getLogger(LimWebSocketServer.class);

    BootstrapConfig.TcpConfig config;
    EventLoopGroup mainGroup;
    EventLoopGroup subGroup;
    ServerBootstrap server;

    public LimWebSocketServer(BootstrapConfig.TcpConfig config) {
        this.config = config;
        mainGroup = new NioEventLoopGroup();
        subGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
                .option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // websocket 基于http协议,所以要有http编解码器
                        pipeline.addLast("http-codec", new HttpServerCodec());
                        // 对写大数据流的支持
                        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                        // 几乎在netty中的编程,都会使用到此hanler
                        pipeline.   addLast("aggregator", new HttpObjectAggregator(65535));
                        /**
                         * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
                         * 本handler会帮你处理一些繁重的复杂的事
                         * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
                         * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
                         */
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
                        pipeline.addLast(new WebSocketMessageDecoder());
                        pipeline.addLast(new WebSocketMessageEncoder());
                        pipeline.addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));
                    }
                });
    }

    public void start(){
        this.server.bind(this.config.getWebSocketPort());
    }
}

snakeyaml

依赖

<!-- yaml -->
<dependency>
    <groupId>org.yaml</groupId>
    <artifactId>snakeyaml</artifactId>
    <version>${snakeyaml.version}</version>
</dependency>

使用

    private static void start(String path){
        try {
            Yaml yaml = new Yaml();
            InputStream inputStream = new FileInputStream(path);
            BootstrapConfig bootstrapConfig = yaml.loadAs(inputStream, BootstrapConfig.class);

            new LimServer(bootstrapConfig.getLim()).start();
            new LimWebSocketServer(bootstrapConfig.getLim()).start();

            RedisManager.init(bootstrapConfig);
            MqFactory.init(bootstrapConfig.getLim().getRabbitmq());
            MessageReciver.init(bootstrapConfig.getLim().getBrokerId()+"");
            registerZK(bootstrapConfig);

        }catch (Exception e){
            e.printStackTrace();
            System.exit(500);
        }
    }

配置类

@Data
public class BootstrapConfig {

    private TcpConfig lim;


    @Data
    public static class TcpConfig {
        private Integer tcpPort;// tcp 绑定的端口号

        private Integer webSocketPort; // webSocket 绑定的端口号

        private boolean enableWebSocket; //是否启用webSocket

        private Integer bossThreadSize; // boss线程 默认=1

        private Integer workThreadSize; //work线程

        private Long heartBeatTime; //心跳超时时间 单位毫秒

        private Integer loginModel;

        /**
         * redis配置
         */
        private RedisConfig redis;

        /**
         * rabbitmq配置
         */
        private Rabbitmq rabbitmq;

        /**
         * zk配置
         */
        private ZkConfig zkConfig;

        /**
         * brokerId
         */
        private Integer brokerId;

        private String logicUrl;

    }

    @Data
    public static class ZkConfig {
        /**
         * zk连接地址
         */
        private String zkAddr;

        /**
         * zk连接超时时间
         */
        private Integer zkConnectTimeOut;
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RedisConfig {

        /**
         * 单机模式:single 哨兵模式:sentinel 集群模式:cluster
         */
        private String mode;
        /**
         * 数据库
         */
        private Integer database;
        /**
         * 密码
         */
        private String password;
        /**
         * 超时时间
         */
        private Integer timeout;
        /**
         * 最小空闲数
         */
        private Integer poolMinIdle;
        /**
         * 连接超时时间(毫秒)
         */
        private Integer poolConnTimeout;
        /**
         * 连接池大小
         */
        private Integer poolSize;

        /**
         * redis单机配置
         */
        private RedisSingle single;

    }

    /**
     * redis单机配置
     */
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RedisSingle {
        /**
         * 地址
         */
        private String address;
    }

    /**
     * rabbitmq哨兵模式配置
     */
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Rabbitmq {
        private String host;

        private Integer port;

        private String virtualHost;

        private String userName;

        private String password;
    }

}

配置文件

lim:
  tcpPort: 9000
  webSocketPort: 19000
  bossThreadSize: 1
  workThreadSize: 8
  heartBeatTime: 20000 #心跳超时时间 单位毫秒
  brokerId: 1000
  loginModel: 3
  logicUrl: http://127.0.0.1:8000/v1
  #  *                多端同步模式:1 只允许一端在线,手机/电脑/web 踢掉除了本client+imel的设备
  #  *                            2 允许手机/电脑的一台设备 + web在线 踢掉除了本client+imel的非web端设备
  #  *                            3 允许手机和电脑单设备 + web 同时在线 踢掉非本client+imel的同端设备
  #  *                            4 允许所有端多设备登录 不踢任何设备

  redis:
    mode: single # 单机模式:single 哨兵模式:sentinel 集群模式:cluster
    database: 0
    password:
    timeout: 3000 # 超时时间
    poolMinIdle: 8 #最小空闲数
    poolConnTimeout: 3000 # 连接超时时间(毫秒)
    poolSize: 10 # 连接池大小
    single: #redis单机配置
      address: 127.0.0.1:6379
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtualHost: /
    userName: guest
    password: guest

  zkConfig:
    zkAddr: 127.0.0.1:2181
    zkConnectTimeOut: 5000

私有协议解码

MessageDecoder

public class MessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf in, List<Object> out) throws Exception {
    //请求头(指令
        // 版本
        // clientType
        // 消息解析类型
        // appId
        // imei长度
        // bodylen)+ imei号 + 请求体

        if(in.readableBytes() < 28){
            return;
        }

        Message message = ByteBufToMessageUtils.transition(in);
        if(message == null){
            return;
        }

        out.add(message);
    }
}

ByteBufToMessageUtils

/**
 * @author: Chackylee
 * @description: 将ByteBuf转化为Message实体,根据私有协议转换
 *               私有协议规则,
 *               4位表示Command表示消息的开始,
 *               4位表示version
 *               4位表示clientType
 *               4位表示messageType
 *               4位表示appId
 *               4位表示imei长度
 *               imei
 *               4位表示数据长度
 *               data
 *               后续将解码方式加到数据头根据不同的解码方式解码,如pb,json,现在用json字符串
 * @version: 1.0
 */
public class ByteBufToMessageUtils {

    public static Message transition(ByteBuf in){

        /** 获取command*/
        int command = in.readInt();

        /** 获取version*/
        int version = in.readInt();

        /** 获取clientType*/
        int clientType = in.readInt();

        /** 获取clientType*/
        int messageType = in.readInt();

        /** 获取appId*/
        int appId = in.readInt();

        /** 获取imeiLength*/
        int imeiLength = in.readInt();

        /** 获取bodyLen*/
        int bodyLen = in.readInt();

        if(in.readableBytes() < bodyLen + imeiLength){
            in.resetReaderIndex();
            return null;
        }

        byte [] imeiData = new byte[imeiLength];
        in.readBytes(imeiData);
        String imei = new String(imeiData);

        byte [] bodyData = new byte[bodyLen];
        in.readBytes(bodyData);


        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setAppId(appId);
        messageHeader.setClientType(clientType);
        messageHeader.setCommand(command);
        messageHeader.setLength(bodyLen);
        messageHeader.setVersion(version);
        messageHeader.setMessageType(messageType);
        messageHeader.setImei(imei);

        Message message = new Message();
        message.setMessageHeader(messageHeader);

        if(messageType == 0x0){
            String body = new String(bodyData);
            JSONObject parse = (JSONObject) JSONObject.parse(body);
            message.setMessagePack(parse);
        }

        in.markReaderIndex();
        return message;
    }

}


http://www.kler.cn/a/274377.html

相关文章:

  • 深度学习试题及答案解析(二)
  • 搭建MPI/CUDA开发环境
  • PostgreSQL技术内幕21:SysLogger日志收集器的工作原理
  • vue预览和下载 pdf、ppt、word、excel文档,文件类型为链接或者base64格式或者文件流,
  • uniapp video组件无法播放视频解决方案
  • 0基础学java之Day29(单例模式、死锁)
  • 嵌入式学习-ARM-Day4
  • 【FPGA】摄像头模块OV5640
  • Linux系统及操作 (05)
  • 【ESP32接入国产大模型之MiniMax】
  • Python入门(小白友好)
  • Springboot和Spring Cloud版本对应
  • ClickHouse--13--springboot+mybatis配置clickhouse
  • 红与黑(c++题解)
  • 【复现】【免费】基于多时间尺度滚动优化的多能源微网双层调度模型
  • springboot校服订购系统
  • 阿里云发布 AI 编程助手 “通义灵码”——VSCode更强了 !!
  • 考研失败, 学点Java打小工_Day3_卫语句_循环
  • 阿里云2核4G4M轻量应用服务器价格165元一年
  • [QJS xmake] 非常简单地在Windows下编译QuickJS!
  • MySQL数据自动同步到Es
  • 关系数据库:关系数据结构基础与概念解析
  • 代码随想录算法训练营第二十八天|93. 复原 IP 地址,78. 子集,90. 子集 II
  • GPT能复制人类的决策和直觉吗?
  • Vue async (type = 0) => {}代码讲解
  • 前端 - 基础 表单标签 -- 表单元素( input - type属性) 文本框和密码框