im-system学习
文章目录
- LimServer
- LimServer
- snakeyaml
- 依赖
- 使用
- 配置类
- 配置文件
- 私有协议解码
- MessageDecoder
- ByteBufToMessageUtils
这个很全: IM即时通讯系统[SpringBoot+Netty]——梳理(总)
IO线程模型
Redis 分布式客户端 Redisson 分布式锁快速入门
LimServer
public class LimServer {
private final static Logger logger = LoggerFactory.getLogger(LimServer.class);
BootstrapConfig.TcpConfig config;
EventLoopGroup mainGroup;
EventLoopGroup subGroup;
ServerBootstrap server;
public LimServer(BootstrapConfig.TcpConfig config) {
this.config = config;
mainGroup = new NioEventLoopGroup(config.getBossThreadSize());
subGroup = new NioEventLoopGroup(config.getWorkThreadSize());
server = new ServerBootstrap();
server.group(mainGroup,subGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new MessageEncoder());
// ch.pipeline().addLast(new IdleStateHandler(
// 0, 0,
// 10));
ch.pipeline().addLast(new HeartBeatHandler(config.getHeartBeatTime()));
ch.pipeline().addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));
}
});
}
public void start(){
this.server.bind(this.config.getTcpPort());
}
}
LimServer
public class LimWebSocketServer {
private final static Logger logger = LoggerFactory.getLogger(LimWebSocketServer.class);
BootstrapConfig.TcpConfig config;
EventLoopGroup mainGroup;
EventLoopGroup subGroup;
ServerBootstrap server;
public LimWebSocketServer(BootstrapConfig.TcpConfig config) {
this.config = config;
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http协议,所以要有http编解码器
pipeline.addLast("http-codec", new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
// 几乎在netty中的编程,都会使用到此hanler
pipeline. addLast("aggregator", new HttpObjectAggregator(65535));
/**
* websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
* 本handler会帮你处理一些繁重的复杂的事
* 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketMessageDecoder());
pipeline.addLast(new WebSocketMessageEncoder());
pipeline.addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));
}
});
}
public void start(){
this.server.bind(this.config.getWebSocketPort());
}
}
snakeyaml
依赖
<!-- yaml -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
使用
private static void start(String path){
try {
Yaml yaml = new Yaml();
InputStream inputStream = new FileInputStream(path);
BootstrapConfig bootstrapConfig = yaml.loadAs(inputStream, BootstrapConfig.class);
new LimServer(bootstrapConfig.getLim()).start();
new LimWebSocketServer(bootstrapConfig.getLim()).start();
RedisManager.init(bootstrapConfig);
MqFactory.init(bootstrapConfig.getLim().getRabbitmq());
MessageReciver.init(bootstrapConfig.getLim().getBrokerId()+"");
registerZK(bootstrapConfig);
}catch (Exception e){
e.printStackTrace();
System.exit(500);
}
}
配置类
@Data
public class BootstrapConfig {
private TcpConfig lim;
@Data
public static class TcpConfig {
private Integer tcpPort;// tcp 绑定的端口号
private Integer webSocketPort; // webSocket 绑定的端口号
private boolean enableWebSocket; //是否启用webSocket
private Integer bossThreadSize; // boss线程 默认=1
private Integer workThreadSize; //work线程
private Long heartBeatTime; //心跳超时时间 单位毫秒
private Integer loginModel;
/**
* redis配置
*/
private RedisConfig redis;
/**
* rabbitmq配置
*/
private Rabbitmq rabbitmq;
/**
* zk配置
*/
private ZkConfig zkConfig;
/**
* brokerId
*/
private Integer brokerId;
private String logicUrl;
}
@Data
public static class ZkConfig {
/**
* zk连接地址
*/
private String zkAddr;
/**
* zk连接超时时间
*/
private Integer zkConnectTimeOut;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class RedisConfig {
/**
* 单机模式:single 哨兵模式:sentinel 集群模式:cluster
*/
private String mode;
/**
* 数据库
*/
private Integer database;
/**
* 密码
*/
private String password;
/**
* 超时时间
*/
private Integer timeout;
/**
* 最小空闲数
*/
private Integer poolMinIdle;
/**
* 连接超时时间(毫秒)
*/
private Integer poolConnTimeout;
/**
* 连接池大小
*/
private Integer poolSize;
/**
* redis单机配置
*/
private RedisSingle single;
}
/**
* redis单机配置
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class RedisSingle {
/**
* 地址
*/
private String address;
}
/**
* rabbitmq哨兵模式配置
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class Rabbitmq {
private String host;
private Integer port;
private String virtualHost;
private String userName;
private String password;
}
}
配置文件
lim:
tcpPort: 9000
webSocketPort: 19000
bossThreadSize: 1
workThreadSize: 8
heartBeatTime: 20000 #心跳超时时间 单位毫秒
brokerId: 1000
loginModel: 3
logicUrl: http://127.0.0.1:8000/v1
# * 多端同步模式:1 只允许一端在线,手机/电脑/web 踢掉除了本client+imel的设备
# * 2 允许手机/电脑的一台设备 + web在线 踢掉除了本client+imel的非web端设备
# * 3 允许手机和电脑单设备 + web 同时在线 踢掉非本client+imel的同端设备
# * 4 允许所有端多设备登录 不踢任何设备
redis:
mode: single # 单机模式:single 哨兵模式:sentinel 集群模式:cluster
database: 0
password:
timeout: 3000 # 超时时间
poolMinIdle: 8 #最小空闲数
poolConnTimeout: 3000 # 连接超时时间(毫秒)
poolSize: 10 # 连接池大小
single: #redis单机配置
address: 127.0.0.1:6379
rabbitmq:
host: 127.0.0.1
port: 5672
virtualHost: /
userName: guest
password: guest
zkConfig:
zkAddr: 127.0.0.1:2181
zkConnectTimeOut: 5000
私有协议解码
MessageDecoder
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {
//请求头(指令
// 版本
// clientType
// 消息解析类型
// appId
// imei长度
// bodylen)+ imei号 + 请求体
if(in.readableBytes() < 28){
return;
}
Message message = ByteBufToMessageUtils.transition(in);
if(message == null){
return;
}
out.add(message);
}
}
ByteBufToMessageUtils
/**
* @author: Chackylee
* @description: 将ByteBuf转化为Message实体,根据私有协议转换
* 私有协议规则,
* 4位表示Command表示消息的开始,
* 4位表示version
* 4位表示clientType
* 4位表示messageType
* 4位表示appId
* 4位表示imei长度
* imei
* 4位表示数据长度
* data
* 后续将解码方式加到数据头根据不同的解码方式解码,如pb,json,现在用json字符串
* @version: 1.0
*/
public class ByteBufToMessageUtils {
public static Message transition(ByteBuf in){
/** 获取command*/
int command = in.readInt();
/** 获取version*/
int version = in.readInt();
/** 获取clientType*/
int clientType = in.readInt();
/** 获取clientType*/
int messageType = in.readInt();
/** 获取appId*/
int appId = in.readInt();
/** 获取imeiLength*/
int imeiLength = in.readInt();
/** 获取bodyLen*/
int bodyLen = in.readInt();
if(in.readableBytes() < bodyLen + imeiLength){
in.resetReaderIndex();
return null;
}
byte [] imeiData = new byte[imeiLength];
in.readBytes(imeiData);
String imei = new String(imeiData);
byte [] bodyData = new byte[bodyLen];
in.readBytes(bodyData);
MessageHeader messageHeader = new MessageHeader();
messageHeader.setAppId(appId);
messageHeader.setClientType(clientType);
messageHeader.setCommand(command);
messageHeader.setLength(bodyLen);
messageHeader.setVersion(version);
messageHeader.setMessageType(messageType);
messageHeader.setImei(imei);
Message message = new Message();
message.setMessageHeader(messageHeader);
if(messageType == 0x0){
String body = new String(bodyData);
JSONObject parse = (JSONObject) JSONObject.parse(body);
message.setMessagePack(parse);
}
in.markReaderIndex();
return message;
}
}