当前位置: 首页 > article >正文

Netty笔记

本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频

学习netty之前要先打好NIO的基础,可以先去看我的另一篇文章

一、概述

不想看的可以直接跳过

Netty 的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra - nosql 数据库

  • Spark - 大数据分布式计算框架

  • Hadoop - 大数据分布式存储框架

  • RocketMQ - ali 开源的消息队列

  • ElasticSearch - 搜索引擎

  • gRPC - rpc 框架

  • Dubbo - rpc 框架

  • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端

  • Zookeeper - 分布式协调框架

Netty 的优势

  • Netty vs NIO,工作量大,bug 多

    • 需要自己构建协议

    • 解决 TCP 传输问题,如粘包、半包

    • epoll 空轮询导致 CPU 100%

    • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

  • Netty vs 其它网络应用框架

    • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀

    • 久经考验,16年,Netty 版本

      • 2.x 2004

      • 3.x 2008

      • 4.x 2013

      • 5.x 已废弃(没有明显的性能提升,维护成本高)

二、入门案例

首次看netty的代码会比较乱,不要慌,多看看多学学,就会很熟悉的。
最重要的是要理解每一步的作用

开发一个简单的服务器端和客户端

  • 客户端向服务器端发送 hello, world

  • 服务器仅接收,不返回

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

服务端

new ServerBootstrap()
    .group(new NioEventLoopGroup()) // 1
    .channel(NioServerSocketChannel.class) // 2
    .childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringDecoder()); // 5
            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                    System.out.println(msg);
                }
            });
        }
    })
    .bind(8080); // 4

  • 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开

  • 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有

  • 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

  • 4 处,ServerSocketChannel 绑定的监听端口

  • 5 处,SocketChannel 的处理器,解码 ByteBuf => String

  • 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果

客户端

new Bootstrap()
    .group(new NioEventLoopGroup()) // 1
    .channel(NioSocketChannel.class) // 2
    .handler(new ChannelInitializer<Channel>() { // 3
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder()); // 8
        }
    })
    .connect("127.0.0.1", 8080) // 4
    .sync() // 5
    .channel() // 6
    .writeAndFlush(new Date() + ": hello world!"); // 7
  • 1 处,创建 NioEventLoopGroup,同 Server

  • 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有

  • 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

  • 4 处,指定要连接的服务器和端口

  • 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕

  • 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作

  • 7 处,写入消息并清空缓冲区

  • 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出

  • 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程

流程分析

其实黑马使用链式编程对初学者来说并不友好,我下面对代码进行拆分

乍一看怎么全是ServerBootstrap,其实这些操作都是围绕着这个类在转的。

下一章组件 将会对每一个组件进行详细的分析,到时候就没有这么乱了

可以说明的是NioEventLoopGroup 是一个EventLoop的集合,EventLoop相当于NIO里面处理读写时间的工作者,都可以单开线程的。类似于NIO里面的多线程优化。

  ServerBootstrap serverBootstrap = new ServerBootstrap();//得到启动类
        NioEventLoopGroup group = new NioEventLoopGroup();// EventLoop的集合
        ServerBootstrap serverBootstrap1 = serverBootstrap.group(group);// 将EventLoopGroup交给启动类
        ServerBootstrap serverBootstrap2 = serverBootstrap1.channel(NioServerSocketChannel.class);// 指定channel的类型

        ServerBootstrap serverBootstrap3 = serverBootstrap2.childHandler(new ChannelInitializer<NioSocketChannel>() {
            // 设置子通道(Channel)的处理器(ChannelHandler)的
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new StringDecoder()); // 添加处理器
                ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 添加自定义处理器
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                        System.out.println(msg);
                    }
                });
            }
        });
        ChannelFuture channelFuture = serverBootstrap1.bind(8080);// 绑定端口

三、组件 

1、EventLoop

我在前面提过,这就相当于处理读写事件的工作者。维护着自己的Selector。

我们之前提到过一个Selector能监听多个channel。一个服务端有多个Selector

事件循环对象

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法(这也就让他有处理定时任务的能力)

  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,

    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop

    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力

    • 另有 next 方法获取集合中下一个 EventLoop

可以自己指定group的大小,但是没有必要

// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());

结果: 

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98

优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

演示 NioEventLoop 处理 io 事件

下面主要演示的是 工人是轮流工作的,但是对于同一个channel,多次来进行读写,为他服务的是同一个工作(也就是EventLoop)

服务器端两个 nio worker 工人

new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    }).bind(8080).sync();

客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("init...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .channel(NioSocketChannel.class).connect("localhost", 8080)
            .sync()
            .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
    Thread.sleep(2000);
    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

最后输出

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi           
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu        
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu         

可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定

2、channelFuture

3、future和promise

4、bytebuf

工具类:用于调试 展示bytebuf的内容

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

1、内存模式和池化

堆内存vs直接内存

堆内存 分配效率高,但读写效率低。直接内存反之。

使用ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
默认方式创建出来的是使用的直接内存

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用

  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化 vs 非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力

  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率

  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现

  • 4.1 之前,池化功能还不成熟,默认是非池化实现

2、组成、写入、读取

相比于ByteBuffer的优点是①可扩容  ②使用读写指针,不用切换,逻辑更简单

读过的内存就废除 。

如果没有指定初始大小,默认是256个字节

扩容

再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容

buffer.writeInt(6);
log(buffer);

扩容规则是

  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16

  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)

  • 扩容不能超过 max capacity 会报错

使用mark配合reset实现读取多次,其实就是做个标记,然后跳回标记的地方。

 public static void main(String[] args) throws ExecutionException, InterruptedException {

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
        buffer.writeBytes(new byte[]{1,2,3,4,5});
        log(buffer);
        buffer.markReaderIndex();
        System.out.println(buffer.readByte());
       
        buffer.resetReaderIndex();
        System.out.println(buffer.readByte());

        log(buffer);
    }

3、slice  和composite

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

例,原始 ByteBuf 进行一些初始操作

ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

切片的正确使用

就是每个切片要执行retain,防止内存被释放。等到自己用完之后再执行release

composite合并

public static void main(String[] args) {
        ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();
        buffer1.writeBytes(new byte[]{1,2,3});
        ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();
        buffer2.writeBytes(new byte[]{4,5,6});

        CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();
        compositeBuffer.addComponents(true,buffer1,buffer2);
        log(compositeBuffer);
    }

5、实现一个简单的回响功能

客户端发送什么,服务端就回复什么

这是 服务端的代码

package cn.itcast.mytest.homework;

import com.sun.corba.se.internal.CosNaming.BootstrapServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
//回声服务器  返回客户端发送的消息
public class Server {
    public static void main(String[] args) throws InterruptedException {
        log.debug("启动中。。。");
       new ServerBootstrap().group(new NioEventLoopGroup())
               .channel(NioServerSocketChannel.class)
               .childHandler(new ChannelInitializer<NioSocketChannel>() {
                   @Override
                   protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                       nioSocketChannel.pipeline().addLast(new StringDecoder());
                       nioSocketChannel.pipeline().addLast(new StringEncoder());
                       nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                           @Override
                           public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                               log.debug("收到消息===>{}",msg);
                               nioSocketChannel.writeAndFlush(msg);
                           }
                       });
                       nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                           @Override
                           public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                               log.debug("发送消息===>{}",msg);
                               super.write(ctx, msg, promise);
                           }
                       });
                   }
               }).bind(8080);
    }
}

这是客户端

package cn.itcast.mytest.homework;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.java.Log;
import lombok.extern.slf4j.Slf4j;

import java.util.Scanner;

@Slf4j
public class Client {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("收到消息==>{}",msg);
                            }
                        });
                    }
                }).connect("localhost",8080).sync();
        Scanner sc=new Scanner(System.in);

        while(true){
            System.out.println("请输入要发送的消息(输入q退出)");
            String s = sc.nextLine();
            if(!"q".equals(s)){

                channelFuture.channel().writeAndFlush(s);
            }else{
                break;
            }
        }
        log.debug("结束对话");



    }
}

四、黏包和半包

1、黏包现象

所谓黏包现象,就是发送方在短时间内发送多条数据,接收方无法准确分辨出每个独立数据包的边界。这种情况常见于基于流的传输协议(如TCP),因为TCP是面向字节流的协议,数据在网络中以流的形式发送,而不是分包的形式。


就是多个数据包黏在一起了

channel建立成功之后,会触发active事件。

案例 :客户端连续发送10次16字节的数据,服务方接收数据之后打印出来,我们会发现,服务端试将这10条数据当成1条数据了。

接收方(服务端)


@Slf4j
public class HelloWorldServer {
    static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("connected {}", ctx.channel());
                            super.channelActive(ctx);
                        }

                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("disconnect {}", ctx.channel());
                            super.channelInactive(ctx);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            log.debug("{} binding...", channelFuture.channel());
            channelFuture.sync();
            log.debug("{} bound...", channelFuture.channel());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            log.debug("stoped");
        }
    }

    public static void main(String[] args) {
        new HelloWorldServer().start();
    }
}

发送方(客户端)

@Slf4j
public class HelloWorldClient {
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            Random r = new Random();
                            char c = 'a';
                            for (int i = 0; i < 10; i++) {
                                ByteBuf buffer = ctx.alloc().buffer();
                                buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                                ctx.writeAndFlush(buffer);
                            }
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

2、半包现象 

半包现象指的是在网络通信中,一个逻辑上的数据包被拆分成多个部分进行接收。这种现象通常发生在基于流的协议(如 TCP)中,由于 TCP 是面向字节流的协议,数据在传输时并不会被强制分包,而是以流的方式发送和接收。

案例 :客户端一次发送1600字节的数据,但是服务端一次只能接受1024,要将一份数据分成两份,出现了半包的现象。


@Slf4j
public class HelloWorldServer {
    static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("connected {}", ctx.channel());
                            super.channelActive(ctx);
                        }

                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("disconnect {}", ctx.channel());
                            super.channelInactive(ctx);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            log.debug("{} binding...", channelFuture.channel());
            channelFuture.sync();
            log.debug("{} bound...", channelFuture.channel());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            log.debug("stoped");
        }
    }

    public static void main(String[] args) {
        new HelloWorldServer().start();
    }
}

@Slf4j
public class HelloWorldClient {
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            ByteBuf buffer = ctx.alloc().buffer();
                            for (int i = 0; i < 100; i++) {
                                buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                            }
                            ctx.writeAndFlush(buffer);

                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

 

出现原因 

本质是因为 TCP 是流式协议,消息无边界

滑动窗口

  • TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差

为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值

  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用

    • 图中深色的部分即要发送的数据,高亮的部分即窗口

    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动

    • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动

    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

3、解决方案

(1)短连接  

每一次接收完就断开连接,分10次发送

public class HelloWorldClient {
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            send();
        }

    }

    private static void send() {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            ByteBuf buffer = ctx.alloc().buffer();

                                buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});

                            ctx.writeAndFlush(buffer);
                            ctx.close();

                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

 半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的

(2)定长解码器

本质就是发送一系列消息,以最长的那个消息为固定值。然后将发送的消息都以这个固定的长度为准,如果长度不够就进行填充。

前提是服务端和客户端要约定好长度。

在服务端代码中要加上  将收到的数据进行解码

ch.pipeline().addLast(new FixedLengthFrameDecoder(8));

客户端代码 

@Slf4j
public class HelloWorldClient {
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);

    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            // 发送内容随机的数据包
                            Random r = new Random();
                            char c = 'a';
                            ByteBuf buffer = ctx.alloc().buffer();
                            for (int i = 0; i < 10; i++) {
                                byte[] bytes = new byte[8];
                                for (int j = 0; j < r.nextInt(8); j++) {
                                    bytes[j] = (byte) c;
                                }
                                c++;
                                buffer.writeBytes(bytes);
                            }
                            ctx.writeAndFlush(buffer);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

服务端接收定长的数据

(3)行解码器

消息之间以换行符作为分隔符  LineBasedFrameDecoder

参数是指定最大长度,要是超过这个最大长度还没找到分隔符就报错

 服务端加上

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

客户端代码

package cn.itcast.mytest;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;
@Slf4j
public class HelloWorldClient {
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);

    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            // 发送内容随机的数据包

                            ByteBuf buffer = ctx.alloc().buffer();
                            char c='0';
                            Random r=new Random();
                            StringBuilder sb=new StringBuilder();
                            for (int i = 0; i < 10; i++) {
                                sb.setLength(0);
                                for(int j=0;j<r.nextInt(256);j++){
                                    sb.append(c);
                                }
                                sb.append("\n");
                                buffer.writeBytes(sb.toString().getBytes());
                                c++;
                            }
                            ctx.writeAndFlush(buffer);


                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }


}

 

这种方法性能低,用得比较少。

(4)LTC解码器 

 lengthFieldOffset   长度字段偏移量
  lengthFieldLength    长度字段 的长度
  lengthAdjustment    长度字段跳到正式内容的偏移值
  initialBytesToStrip   消息剖离

例子1:

长度字段偏移量是0,因为一开始就是长度字段    0
长度字段占两个字节  2
长度字段之后跳0字节就是正式内容   0
消息不剖离  0

例子2:

消息头可能携带其他信息,比如版本号、协议。比如下面的hdr1、hdr2

长度字段偏移量是1  从开始到长度字段有1个字节    1 
长度字段占两个字节  2
长度字段之后跳0字节就是正式内容   hdr2占1个字节    1
消息不剖离  3    从头到第3个字节的部分不要。

服务端加上   第一个参数是消息的最大长度

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder
        (1024,0,4,0,0));

客户端代码 

public class HelloWorldClient {
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);

    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            // 发送内容随机的数据包

                            ByteBuf buffer = ctx.alloc().buffer();
                            String content="hello world";
                            //将内容加入到缓冲区
                            buffer.writeInt(content.length());//先写入长度
                            buffer.writeBytes(content.getBytes());

                            String content2="hi";
                            //将内容加入到缓冲区
                            buffer.writeInt(content2.length());//先写入长度
                            buffer.writeBytes(content2.getBytes());

                            ctx.writeAndFlush(buffer);


                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }


}

 

注意:如果buffer中的消息长度不一致的时候,参数要以最长的消息制定

假如我将上面代码的长度字段改成2,也就是意味着长度字段所占的字节数只有2
hello world消息中 长度字段是00 00 00 0b,这样他只读取到00,消息长度才0
第二次读取到 00 0b 才会去真正读hello world
这是错误的   (int 占4字节)

五、协议设计与解析 

1、redis

其实我们可以用过netty发送消息给redis,比如set name zhangsan
解析出来就是 下面这图

使用下面的代码可以向redis发送消息并得到响应 

@Slf4j
public class testRedis {
    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        byte[] LINE = {13, 10};
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new LoggingHandler());
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        // 会在连接 channel 建立成功后,会触发 active 事件
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) {
                            set(ctx);
                            get(ctx);
                        }
                        private void get(ChannelHandlerContext ctx) {
                            ByteBuf buf = ctx.alloc().buffer();
                            buf.writeBytes("*2".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("get".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("aaa".getBytes());
                            buf.writeBytes(LINE);
                            ctx.writeAndFlush(buf);
                        }
                        private void set(ChannelHandlerContext ctx) {
                            ByteBuf buf = ctx.alloc().buffer();
                            buf.writeBytes("*3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("set".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("aaa".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("bbb".getBytes());
                            buf.writeBytes(LINE);
                            ctx.writeAndFlush(buf);
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            System.out.println(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

2、http

http协议相对来说比较复杂,好在netty已经帮我们封装了编解码器

我们只需要加上ch.pipeline().addLast(new HttpServerCodec());即可

首先我们先打印出消息的类型

浏览器访问http://localhost:8080/index.html之后(index.html可以换成其他的)

11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.DefaultHttpRequest
11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.LastHttpContent$1 

我们会发现消息有两个,一个是请求头、请求行  ,另一个是请求体。这是http编解码器解析的

我们可以使用

 if (msg instanceof HttpRequest) { // 请求行,请求头

                    } else if (msg instanceof HttpContent) { //请求体

                    }

进行判断。


或者我们可以使用 SimpleChannelInboundHandler只处理我们关心的消息类型

 public static void main(String[] args) throws InterruptedException {

        new ServerBootstrap().group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)

                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new HttpServerCodec());
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                                log.debug("{}",msg.uri());
                                DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                                byte[] m="<h1>Hello World</h1>".getBytes();
                                response.content().writeBytes(m);
                                response.headers().setInt(CONTENT_LENGTH,m.length);//消息头写入 正文的长度,这样浏览器就不用一直空转
                                ctx.writeAndFlush(response);
                            }
                        });
                       /* ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("{}",msg.getClass());
                            }
                        });*/


                    }
                }).bind(8080)
                .sync().channel();

    }

3、自定义

要素

  • 魔数,用来在第一时间判定是否是无效数据包

  • 版本号,可以支持协议的升级

  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk

  • 指令类型,是登录、注册、单聊、群聊... 跟业务相关

  • 请求序号,为了双工通信,提供异步能力

  • 正文长度

  • 消息正文

编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}

注意

channel能不能共享Handler。看源码有没有@Sharable注解,否则有线程安全问题

ByteToMessageCodec的子类不能标注为@Sharable

想要使用这个注解,就得继承MessageToMessageCodec,这个类默认是消息转化为消息,不会出现黏包和半包的情况。所以要使用LengthFieldBasedFrameDecoder来确保不会出现黏包半包的情况

@ChannelHandler.Sharable
/**
 * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> 

自定义类没有保存状态就是线程安全的

六、聊天业务

1、登录功能

使用最简单的控制台输入,还有固定数据

注意:全是采用我们之前自定义的编解码器

MessageCodecSharable   
相关资料,我上传上去了

服务端代码

添加后处理器 在连接建立之后执行校验用户名、密码的功能


@Slf4j
public class ChatServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
                            String username = msg.getUsername();
                            String password = msg.getPassword();
                            boolean b = UserServiceFactory.getUserService().login(username, password);
                            LoginResponseMessage result =null;
                            if(b){
                                //登录成功
                                 result = new LoginResponseMessage(true, "登录成功");
                            }else{
                                 result = new LoginResponseMessage(false, "用户名或密码错误");
                            }
                             ctx.writeAndFlush(result);
                        }
                    });
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端代码 

连接建立之后,控制台输入用户名、密码。发送给客服端


@Slf4j
public class ChatClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);  //打印日志
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            new Thread(()->{
                                Scanner sc=new Scanner(System.in);
                                System.out.println("请输入用户名:");
                                String username = sc.nextLine();
                                System.out.println("请输入密码:");
                                String password = sc.nextLine();
                                LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);
                                ctx.writeAndFlush(loginRequestMessage);
                            }).start();

                        }
                    });
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

线程通信

我们使用CountDownLatch来处理 登陆成功和登陆失败的情况,实现

两个线程在channelRead和channelActive之间的通信

@Slf4j
public class ChatClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);  //打印日志
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();//自定义消息协议 编解码
        CountDownLatch countDownLatch=new CountDownLatch(1);//到0表示可以继续运行
        AtomicBoolean b=new AtomicBoolean(false);//响应是成功还是失败
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            new Thread(()->{
                                Scanner sc=new Scanner(System.in);
                                System.out.println("请输入用户名:");
                                String username = sc.nextLine();
                                System.out.println("请输入密码:");
                                String password = sc.nextLine();
                                LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);
                                ctx.writeAndFlush(loginRequestMessage);
                                try {
                                    countDownLatch.await();//线程阻塞在这里
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                                if(!b.get()){
                                    //如果登录失败。关闭连接
                                    ctx.channel().close();
                                }
                                log.debug("=================菜单=================");
                            }).start();
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                            super.channelRead(ctx,msg);
                            if (msg instanceof LoginResponseMessage) {
                                LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
                                if (responseMessage.isSuccess()) {
                                    //登录成功
                                    b.set(true);


                                }else{

                                    b.set(false);
                                }
                                countDownLatch.countDown();
                            }
                        }
                    });

                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

2、业务消息发送

没什么,看看就行

while(true){
                                   System.out.println("=======================================");
                                   System.out.println("send [username] [content]");
                                   System.out.println("gsend [group name] [content]");
                                   System.out.println("gcreate [group name] [m1,m2,m3...]");
                                   System.out.println("gmembers [group name]");
                                   System.out.println("gjoin [group name]");
                                   System.out.println("gquit [group name]");
                                   System.out.println("quit");
                                   System.out.println("=======================================");
                                   String choice = sc.nextLine();
                                   String[] s = choice.split(" ");
                                   switch (s[0]){
                                       case "send":  ctx.writeAndFlush(new ChatRequestMessage(username,s[1],s[2]));
                                           break;
                                       case "gsend": ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));
                                           break;
                                       case "gcreate":
                                           String[] members = s[2].split(",");
                                          Set<String> set=new HashSet<>( Arrays.asList(members));
                                           ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],set));
                                           break;
                                       case "gmembers":
                                           ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
                                           break;
                                       case "gjoin":
                                           ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));
                                           break;
                                       case "gquit":
                                           ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));
                                           break;
                                       case "quit":
                                           ctx.channel().close();
                                           return ;
                                   }
                               }

3、单聊消息处理

为了让我们的代码更清晰

我们先对匿名内部类转化为内部类

 再转化为外部类

第一步

模仿上面的结构编写自定义ChatRequestMessageHandler,我们写在单独的类下

然后使用addLast加入到channel

package cn.itcast.server.Handler;

import cn.itcast.message.ChatRequestMessage;
import cn.itcast.message.ChatResponseMessage;
import cn.itcast.server.service.UserServiceFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

//处理单聊的业务
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {
        String from = msg.getFrom();
        String to = msg.getTo();

        Channel channel = SessionFactory.getSession().getChannel(to);//查看对方是否在线
        if(channel!=null){
            //在线  发送给接收方
            channel.writeAndFlush(new ChatResponseMessage(from,msg.getContent()));
        }else{
            //发送给  发送方
            channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"用户不存在或者用户不在线"));
        }

    }
}

这是服务端的代码


@Slf4j
public class ChatServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
        ChatRequestMessageHandler CHAT_HANDLER =new ChatRequestMessageHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器
                    ch.pipeline().addLast(LOGIN_HANDLER);
                    ch.pipeline().addLast(CHAT_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

4、群聊创建

在服务端代码中加上这个handler,这里就不做演示了。

package cn.itcast.server.Handler;

import cn.itcast.message.GroupCreateRequestMessage;
import cn.itcast.message.GroupCreateResponseMessage;
import cn.itcast.server.session.Group;
import cn.itcast.server.session.GroupSession;
import cn.itcast.server.session.GroupSessionFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;
import java.util.Set;

@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                GroupCreateRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();

        Set<String> members = msg.getMembers();

        GroupSession groupSession = GroupSessionFactory.getGroupSession();
        Group group = groupSession.createGroup(groupName, members);//不存在的话,创建并返回null
        if(group==null){
            ctx.writeAndFlush(new GroupCreateResponseMessage(true,"群聊创检查成功!"));
            //创建成功
            List<Channel> membersChannel =
                    groupSession.getMembersChannel(groupName);
            for (Channel channel : membersChannel) {
                channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入"+groupName));
            }

        }else{
            //创建失败
            ctx.writeAndFlush(new GroupCreateResponseMessage(false,"群聊已存在"));

        }

    }
}

这里有一个缺陷,就是群主并没有进去群聊。所以要在客户端加上

set.add(username);

其他业务就不一一写了,都是编写自定义的Handler。

5、空闲检测、心跳

连接假死:可能因为网络的原因,连接已经断开了。

我们可以做空闲检测,就是服务端和客户端定时检测是否还有消息传输

如果超过5秒就判断连接假死,但是这样是不合理的,客户端可能没有及时发送消息导致连接被断开。   我们可以在客户端进行定时发送心跳,如果心跳没有传输到服务端可以证明连接确实出现问题。如果能传输到,服务端也不会断开连接;

这里要介绍netty提供的IdleStateHandler,他有三个参数

参数1:读空闲时间  参数2:写空闲时间  参数3:读写空闲时间

时间到了之后会触发响应的事件

我们可以使用ChannelDuplexHandler的userEventTriggered方法来处理各种事件

客户端加上
 ch.pipeline().addLast(new IdleStateHandler(0,3,0));
                    ch.pipeline().addLast(new ChannelDuplexHandler(){
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

                            IdleStateEvent event = (IdleStateEvent) evt;
                            if(event.state()== IdleState.WRITER_IDLE){
                                log.debug("心跳");
                                ctx.writeAndFlush(new PingMessage());
                            }
                        }
                    });


服务端加上


 //参数1:读空闲时间  参数2:写空闲时间  参数3:读写空闲时间
                    ch.pipeline().addLast(new IdleStateHandler(5,0,0));
                    ch.pipeline().addLast(new ChannelDuplexHandler(){
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

                            IdleStateEvent event = (IdleStateEvent) evt;
                            if(event==IdleStateEvent.READER_IDLE_STATE_EVENT){
                                log.debug("5秒没有读事件");
                                ctx.channel().close();
                            }
                        }
                    });

如果客户端还是断开了,是因为这里有个bug,等下一章才学习到。


http://www.kler.cn/news/359985.html

相关文章:

  • 【论文学习与撰写】,论文word文档中出现乱码的情况,文档中显示的乱码,都是英文字母之类的,但打印预览是正常的
  • Flutter 11 Android原生项目集成Flutter Module
  • 判断索引对象中所有元素是否都为真值pandas.Index.all()
  • 【DBA Part01】国产Linux上安装Oracle进行数据迁移
  • opencv 图像缩放操作 - python 实现
  • 【STM32 HAL库】MPU6050姿态解算 卡尔曼滤波
  • vue富文本使用editor
  • Docker 部署 JDK11 图文并茂简单易懂
  • vbs给qq发送消息
  • redis IO多路复用机制
  • 光伏行业如何借助ERP领跑绿色经济?
  • 【人工智能】一文掌握具身智能机器人的仿真平台,再也不需要东奔西走了。
  • Halcon开启多线程
  • 物联网之超声波测距模块、arduino、esp32
  • 【Flutter】状态管理:高级状态管理 (Riverpod, BLoC)
  • 计算机网络最后错题本-cy
  • Centos7搭建minio对象存储服务器
  • Unity之如何使用Unity Cloud Build云构建
  • 【算法系列-栈与队列】栈与队列的双向模拟
  • 新手向-pkg-config的原理和使用