Netty笔记
本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频
学习netty之前要先打好NIO的基础,可以先去看我的另一篇文章
一、概述
不想看的可以直接跳过
Netty 的地位
Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
-
Cassandra - nosql 数据库
-
Spark - 大数据分布式计算框架
-
Hadoop - 大数据分布式存储框架
-
RocketMQ - ali 开源的消息队列
-
ElasticSearch - 搜索引擎
-
gRPC - rpc 框架
-
Dubbo - rpc 框架
-
Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
-
Zookeeper - 分布式协调框架
Netty 的优势
-
Netty vs NIO,工作量大,bug 多
-
需要自己构建协议
-
解决 TCP 传输问题,如粘包、半包
-
epoll 空轮询导致 CPU 100%
-
对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
-
-
Netty vs 其它网络应用框架
-
Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
-
久经考验,16年,Netty 版本
-
2.x 2004
-
3.x 2008
-
4.x 2013
-
5.x 已废弃(没有明显的性能提升,维护成本高)
-
-
二、入门案例
首次看netty的代码会比较乱,不要慌,多看看多学学,就会很熟悉的。
最重要的是要理解每一步的作用
开发一个简单的服务器端和客户端
-
客户端向服务器端发送 hello, world
-
服务器仅接收,不返回
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
服务端
new ServerBootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioServerSocketChannel.class) // 2
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder()); // 5
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8080); // 4
-
1 处,创建 NioEventLoopGroup,可以简单理解为
线程池 + Selector
后面会详细展开 -
2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有
-
3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
-
4 处,ServerSocketChannel 绑定的监听端口
-
5 处,SocketChannel 的处理器,解码 ByteBuf => String
-
6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
客户端
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<Channel>() { // 3
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7
-
1 处,创建 NioEventLoopGroup,同 Server
-
2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有
-
3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
-
4 处,指定要连接的服务器和端口
-
5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
-
6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
-
7 处,写入消息并清空缓冲区
-
8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
-
数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程
流程分析
其实黑马使用链式编程对初学者来说并不友好,我下面对代码进行拆分
乍一看怎么全是ServerBootstrap,其实这些操作都是围绕着这个类在转的。
下一章组件 将会对每一个组件进行详细的分析,到时候就没有这么乱了
可以说明的是NioEventLoopGroup 是一个EventLoop的集合,EventLoop相当于NIO里面处理读写时间的工作者,都可以单开线程的。类似于NIO里面的多线程优化。
ServerBootstrap serverBootstrap = new ServerBootstrap();//得到启动类
NioEventLoopGroup group = new NioEventLoopGroup();// EventLoop的集合
ServerBootstrap serverBootstrap1 = serverBootstrap.group(group);// 将EventLoopGroup交给启动类
ServerBootstrap serverBootstrap2 = serverBootstrap1.channel(NioServerSocketChannel.class);// 指定channel的类型
ServerBootstrap serverBootstrap3 = serverBootstrap2.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 设置子通道(Channel)的处理器(ChannelHandler)的
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder()); // 添加处理器
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 添加自定义处理器
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap1.bind(8080);// 绑定端口
三、组件
1、EventLoop
我在前面提过,这就相当于处理读写事件的工作者。维护着自己的Selector。
我们之前提到过一个Selector能监听多个channel。一个服务端有多个Selector
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
-
一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法(这也就让他有处理定时任务的能力)
-
另一条线是继承自 netty 自己的 OrderedEventExecutor,
-
提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
-
提供了 parent 方法来看看自己属于哪个 EventLoopGroup
-
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
-
继承自 netty 自己的 EventExecutorGroup
-
实现了 Iterable 接口提供遍历 EventLoop 的能力
-
另有 next 方法获取集合中下一个 EventLoop
-
可以自己指定group的大小,但是没有必要
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
结果:
io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98
优雅关闭
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
演示 NioEventLoop 处理 io 事件
下面主要演示的是 工人是轮流工作的,但是对于同一个channel,多次来进行读写,为他服务的是同一个工作(也就是EventLoop)
服务器端两个 nio worker 工人
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
最后输出
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
2、channelFuture
3、future和promise
4、bytebuf
工具类:用于调试 展示bytebuf的内容
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
1、内存模式和池化
堆内存vs直接内存
堆内存 分配效率高,但读写效率低。直接内存反之。
使用ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
默认方式创建出来的是使用的直接内存
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
-
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
-
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf,优点有
-
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
-
有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
-
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
-
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
-
4.1 之前,池化功能还不成熟,默认是非池化实现
2、组成、写入、读取
相比于ByteBuffer的优点是①可扩容 ②使用读写指针,不用切换,逻辑更简单
读过的内存就废除 。
如果没有指定初始大小,默认是256个字节
扩容
再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容
buffer.writeInt(6); log(buffer);
扩容规则是
-
如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
-
如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
-
扩容不能超过 max capacity 会报错
使用mark配合reset实现读取多次,其实就是做个标记,然后跳回标记的地方。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1,2,3,4,5});
log(buffer);
buffer.markReaderIndex();
System.out.println(buffer.readByte());
buffer.resetReaderIndex();
System.out.println(buffer.readByte());
log(buffer);
}
3、slice 和composite
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
例,原始 ByteBuf 进行一些初始操作
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10); origin.writeBytes(new byte[]{1, 2, 3, 4}); origin.readByte(); System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
+-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
切片的正确使用
就是每个切片要执行retain,防止内存被释放。等到自己用完之后再执行release
composite合并
public static void main(String[] args) {
ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();
buffer1.writeBytes(new byte[]{1,2,3});
ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();
buffer2.writeBytes(new byte[]{4,5,6});
CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();
compositeBuffer.addComponents(true,buffer1,buffer2);
log(compositeBuffer);
}
5、实现一个简单的回响功能
客户端发送什么,服务端就回复什么
这是 服务端的代码
package cn.itcast.mytest.homework;
import com.sun.corba.se.internal.CosNaming.BootstrapServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
//回声服务器 返回客户端发送的消息
public class Server {
public static void main(String[] args) throws InterruptedException {
log.debug("启动中。。。");
new ServerBootstrap().group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new StringEncoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("收到消息===>{}",msg);
nioSocketChannel.writeAndFlush(msg);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("发送消息===>{}",msg);
super.write(ctx, msg, promise);
}
});
}
}).bind(8080);
}
}
这是客户端
package cn.itcast.mytest.homework;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.java.Log;
import lombok.extern.slf4j.Slf4j;
import java.util.Scanner;
@Slf4j
public class Client {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("收到消息==>{}",msg);
}
});
}
}).connect("localhost",8080).sync();
Scanner sc=new Scanner(System.in);
while(true){
System.out.println("请输入要发送的消息(输入q退出)");
String s = sc.nextLine();
if(!"q".equals(s)){
channelFuture.channel().writeAndFlush(s);
}else{
break;
}
}
log.debug("结束对话");
}
}
四、黏包和半包
1、黏包现象
所谓黏包现象,就是发送方在短时间内发送多条数据,接收方无法准确分辨出每个独立数据包的边界。这种情况常见于基于流的传输协议(如TCP),因为TCP是面向字节流的协议,数据在网络中以流的形式发送,而不是分包的形式。
就是多个数据包黏在一起了
channel建立成功之后,会触发active事件。
案例 :客户端连续发送10次16字节的数据,服务方接收数据之后打印出来,我们会发现,服务端试将这10条数据当成1条数据了。
接收方(服务端)
@Slf4j
public class HelloWorldServer {
static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
发送方(客户端)
@Slf4j
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
Random r = new Random();
char c = 'a';
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
2、半包现象
半包现象指的是在网络通信中,一个逻辑上的数据包被拆分成多个部分进行接收。这种现象通常发生在基于流的协议(如 TCP)中,由于 TCP 是面向字节流的协议,数据在传输时并不会被强制分包,而是以流的方式发送和接收。
案例 :客户端一次发送1600字节的数据,但是服务端一次只能接受1024,要将一份数据分成两份,出现了半包的现象。
@Slf4j
public class HelloWorldServer {
static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
@Slf4j
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 100; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
出现原因
本质是因为 TCP 是流式协议,消息无边界
滑动窗口
-
TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
-
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
-
图中深色的部分即要发送的数据,高亮的部分即窗口
-
窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
-
如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
-
接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
-
3、解决方案
(1)短连接
每一次接收完就断开连接,分10次发送
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的
(2)定长解码器
本质就是发送一系列消息,以最长的那个消息为固定值。然后将发送的消息都以这个固定的长度为准,如果长度不够就进行填充。
前提是服务端和客户端要约定好长度。
在服务端代码中要加上 将收到的数据进行解码
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
客户端代码
@Slf4j
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 发送内容随机的数据包
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < r.nextInt(8); j++) {
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
服务端接收定长的数据
(3)行解码器
消息之间以换行符作为分隔符 LineBasedFrameDecoder
参数是指定最大长度,要是超过这个最大长度还没找到分隔符就报错
服务端加上
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
客户端代码
package cn.itcast.mytest;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
@Slf4j
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 发送内容随机的数据包
ByteBuf buffer = ctx.alloc().buffer();
char c='0';
Random r=new Random();
StringBuilder sb=new StringBuilder();
for (int i = 0; i < 10; i++) {
sb.setLength(0);
for(int j=0;j<r.nextInt(256);j++){
sb.append(c);
}
sb.append("\n");
buffer.writeBytes(sb.toString().getBytes());
c++;
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
这种方法性能低,用得比较少。
(4)LTC解码器
lengthFieldOffset 长度字段偏移量
lengthFieldLength 长度字段 的长度
lengthAdjustment 长度字段跳到正式内容的偏移值
initialBytesToStrip 消息剖离
例子1:
长度字段偏移量是0,因为一开始就是长度字段 0
长度字段占两个字节 2
长度字段之后跳0字节就是正式内容 0
消息不剖离 0
例子2:
消息头可能携带其他信息,比如版本号、协议。比如下面的hdr1、hdr2
长度字段偏移量是1 从开始到长度字段有1个字节 1
长度字段占两个字节 2
长度字段之后跳0字节就是正式内容 hdr2占1个字节 1
消息不剖离 3 从头到第3个字节的部分不要。
服务端加上 第一个参数是消息的最大长度
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024,0,4,0,0));
客户端代码
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 发送内容随机的数据包
ByteBuf buffer = ctx.alloc().buffer();
String content="hello world";
//将内容加入到缓冲区
buffer.writeInt(content.length());//先写入长度
buffer.writeBytes(content.getBytes());
String content2="hi";
//将内容加入到缓冲区
buffer.writeInt(content2.length());//先写入长度
buffer.writeBytes(content2.getBytes());
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
注意:如果buffer中的消息长度不一致的时候,参数要以最长的消息制定
假如我将上面代码的长度字段改成2,也就是意味着长度字段所占的字节数只有2
hello world消息中 长度字段是00 00 00 0b,这样他只读取到00,消息长度才0
第二次读取到 00 0b 才会去真正读hello world
这是错误的 (int 占4字节)
五、协议设计与解析
1、redis
其实我们可以用过netty发送消息给redis,比如set name zhangsan
解析出来就是 下面这图
使用下面的代码可以向redis发送消息并得到响应
@Slf4j
public class testRedis {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 建立成功后,会触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) {
set(ctx);
get(ctx);
}
private void get(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*2".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("get".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
private void set(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("bbb".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
2、http
http协议相对来说比较复杂,好在netty已经帮我们封装了编解码器
我们只需要加上ch.pipeline().addLast(new HttpServerCodec());即可
首先我们先打印出消息的类型
浏览器访问http://localhost:8080/index.html之后(index.html可以换成其他的)
11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.DefaultHttpRequest
11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.LastHttpContent$1
我们会发现消息有两个,一个是请求头、请求行 ,另一个是请求体。这是http编解码器解析的
我们可以使用
if (msg instanceof HttpRequest) { // 请求行,请求头
} else if (msg instanceof HttpContent) { //请求体
}
进行判断。
或者我们可以使用 SimpleChannelInboundHandler只处理我们关心的消息类型
public static void main(String[] args) throws InterruptedException {
new ServerBootstrap().group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
log.debug("{}",msg.uri());
DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
byte[] m="<h1>Hello World</h1>".getBytes();
response.content().writeBytes(m);
response.headers().setInt(CONTENT_LENGTH,m.length);//消息头写入 正文的长度,这样浏览器就不用一直空转
ctx.writeAndFlush(response);
}
});
/* ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}",msg.getClass());
}
});*/
}
}).bind(8080)
.sync().channel();
}
3、自定义
要素
-
魔数,用来在第一时间判定是否是无效数据包
-
版本号,可以支持协议的升级
-
序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
-
指令类型,是登录、注册、单聊、群聊... 跟业务相关
-
请求序号,为了双工通信,提供异步能力
-
正文长度
-
消息正文
编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
注意
channel能不能共享Handler。看源码有没有@Sharable注解,否则有线程安全问题
ByteToMessageCodec的子类不能标注为@Sharable
想要使用这个注解,就得继承MessageToMessageCodec,这个类默认是消息转化为消息,不会出现黏包和半包的情况。所以要使用LengthFieldBasedFrameDecoder来确保不会出现黏包半包的情况
@ChannelHandler.Sharable /** * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的 */ public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message>
自定义类没有保存状态就是线程安全的
六、聊天业务
1、登录功能
使用最简单的控制台输入,还有固定数据
注意:全是采用我们之前自定义的编解码器
MessageCodecSharable 相关资料,我上传上去了
服务端代码
添加后处理器 在连接建立之后执行校验用户名、密码的功能
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器
ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean b = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage result =null;
if(b){
//登录成功
result = new LoginResponseMessage(true, "登录成功");
}else{
result = new LoginResponseMessage(false, "用户名或密码错误");
}
ctx.writeAndFlush(result);
}
});
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客户端代码
连接建立之后,控制台输入用户名、密码。发送给客服端
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); //打印日志
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(()->{
Scanner sc=new Scanner(System.in);
System.out.println("请输入用户名:");
String username = sc.nextLine();
System.out.println("请输入密码:");
String password = sc.nextLine();
LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);
ctx.writeAndFlush(loginRequestMessage);
}).start();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
线程通信
我们使用CountDownLatch来处理 登陆成功和登陆失败的情况,实现
两个线程在channelRead和channelActive之间的通信
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); //打印日志
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码
CountDownLatch countDownLatch=new CountDownLatch(1);//到0表示可以继续运行
AtomicBoolean b=new AtomicBoolean(false);//响应是成功还是失败
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(()->{
Scanner sc=new Scanner(System.in);
System.out.println("请输入用户名:");
String username = sc.nextLine();
System.out.println("请输入密码:");
String password = sc.nextLine();
LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);
ctx.writeAndFlush(loginRequestMessage);
try {
countDownLatch.await();//线程阻塞在这里
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(!b.get()){
//如果登录失败。关闭连接
ctx.channel().close();
}
log.debug("=================菜单=================");
}).start();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx,msg);
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
if (responseMessage.isSuccess()) {
//登录成功
b.set(true);
}else{
b.set(false);
}
countDownLatch.countDown();
}
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
2、业务消息发送
没什么,看看就行
while(true){
System.out.println("=======================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("=======================================");
String choice = sc.nextLine();
String[] s = choice.split(" ");
switch (s[0]){
case "send": ctx.writeAndFlush(new ChatRequestMessage(username,s[1],s[2]));
break;
case "gsend": ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));
break;
case "gcreate":
String[] members = s[2].split(",");
Set<String> set=new HashSet<>( Arrays.asList(members));
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));
break;
case "quit":
ctx.channel().close();
return ;
}
}
3、单聊消息处理
为了让我们的代码更清晰
我们先对匿名内部类转化为内部类
再转化为外部类
第一步
模仿上面的结构编写自定义ChatRequestMessageHandler,我们写在单独的类下
然后使用addLast加入到channel
package cn.itcast.server.Handler;
import cn.itcast.message.ChatRequestMessage;
import cn.itcast.message.ChatResponseMessage;
import cn.itcast.server.service.UserServiceFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
//处理单聊的业务
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {
String from = msg.getFrom();
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);//查看对方是否在线
if(channel!=null){
//在线 发送给接收方
channel.writeAndFlush(new ChatResponseMessage(from,msg.getContent()));
}else{
//发送给 发送方
channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"用户不存在或者用户不在线"));
}
}
}
这是服务端的代码
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ChatRequestMessageHandler CHAT_HANDLER =new ChatRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器
ch.pipeline().addLast(LOGIN_HANDLER);
ch.pipeline().addLast(CHAT_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
4、群聊创建
在服务端代码中加上这个handler,这里就不做演示了。
package cn.itcast.server.Handler;
import cn.itcast.message.GroupCreateRequestMessage;
import cn.itcast.message.GroupCreateResponseMessage;
import cn.itcast.server.session.Group;
import cn.itcast.server.session.GroupSession;
import cn.itcast.server.session.GroupSessionFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
import java.util.Set;
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);//不存在的话,创建并返回null
if(group==null){
ctx.writeAndFlush(new GroupCreateResponseMessage(true,"群聊创检查成功!"));
//创建成功
List<Channel> membersChannel =
groupSession.getMembersChannel(groupName);
for (Channel channel : membersChannel) {
channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入"+groupName));
}
}else{
//创建失败
ctx.writeAndFlush(new GroupCreateResponseMessage(false,"群聊已存在"));
}
}
}
这里有一个缺陷,就是群主并没有进去群聊。所以要在客户端加上
set.add(username);
其他业务就不一一写了,都是编写自定义的Handler。
5、空闲检测、心跳
连接假死:可能因为网络的原因,连接已经断开了。
我们可以做空闲检测,就是服务端和客户端定时检测是否还有消息传输
如果超过5秒就判断连接假死,但是这样是不合理的,客户端可能没有及时发送消息导致连接被断开。 我们可以在客户端进行定时发送心跳,如果心跳没有传输到服务端可以证明连接确实出现问题。如果能传输到,服务端也不会断开连接;
这里要介绍netty提供的IdleStateHandler,他有三个参数
参数1:读空闲时间 参数2:写空闲时间 参数3:读写空闲时间
时间到了之后会触发响应的事件
我们可以使用ChannelDuplexHandler的userEventTriggered方法来处理各种事件
客户端加上
ch.pipeline().addLast(new IdleStateHandler(0,3,0));
ch.pipeline().addLast(new ChannelDuplexHandler(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state()== IdleState.WRITER_IDLE){
log.debug("心跳");
ctx.writeAndFlush(new PingMessage());
}
}
});
服务端加上
//参数1:读空闲时间 参数2:写空闲时间 参数3:读写空闲时间
ch.pipeline().addLast(new IdleStateHandler(5,0,0));
ch.pipeline().addLast(new ChannelDuplexHandler(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event==IdleStateEvent.READER_IDLE_STATE_EVENT){
log.debug("5秒没有读事件");
ctx.channel().close();
}
}
});
如果客户端还是断开了,是因为这里有个bug,等下一章才学习到。