当前位置: 首页 > article >正文

netty系列(四)websocket client和server

(今天就把netty笔记搬上来和更新一下,没有去整理了)
netty版本4.1.99.final

本文主要记录了netty中websocket handler的源码处理流程

1:先简要概括下netty使用websocket流程

websocket请求流程:client先发送一个http请求表示想要建立websocket连接,然后服务端解析http请求,发现这是个http请求websocket的握手请求,所以建立websocket连接,当处理完第一个http请求后,后续就是websocket请求了,所以服务端在处理完第一个http请求后就要移除pipeline中的http相关handler,同样,client在收到请求完成以后也要删除http相关handler,因为后续的协议是websocket而不是http,而httphandler是无法解析websocket协议请求的,所以要移除

2:测试代码-server和client

server:

package org.example;
public class Server {

    public static void main(String[] args) throws Exception {
        new Server().start(7070);
    }
    public void start(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //!!!需要添加httphandler,不用担心,websocket处理完第一个请求后会自动移除httphandler
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
                            //!!!ws://localhost:7070/helloWs 可以访问server
                            //!!!创建时会自动添加websocketHandShake handler,处理完后会自动移除
                            pipeline.addLast(new WebSocketServerProtocolHandler("/helloWs"));
                            pipeline.addLast(new SimpleChannelInboundHandler<WebSocketFrame>() {
                                @Override
                                protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
                                    if (frame instanceof TextWebSocketFrame) {
                                        // 处理文本消息
                                        String request = ((TextWebSocketFrame) frame).text();
                                        System.out.println("收到客户端消息: " + request);
                                        // 回复客户端消息
                                        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息: " + request));
                                    } else {
                                        System.out.println("收到非文本消息,不支持处理");
                                    }
                                }

                                @Override
                                public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    cause.printStackTrace();
                                    ctx.close();
                                }

                                @Override
                                public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("客户端连接: " + ctx.channel().remoteAddress());
                                }

                                @Override
                                public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("客户端断开连接: " + ctx.channel().remoteAddress());
                                }
                            });
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务器启动,监听端口: " + port);
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

client:


package org.example;
 
public class Client {
  
    final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        Client client = new Client();
        client.test();
    }

    public void test() throws Exception {
        Channel dest = dest();
        latch.await();
        dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));
    }

    public Channel dest() throws Exception {
        final URI webSocketURL = new URI("ws://127.0.0.1:7070/helloWs");

        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap boot = new Bootstrap();
        boot.option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        ChannelPipeline pipeline = sc.pipeline();
                        //!!!需要添加httphandler,不用担心,websocket处理完第一个请求后会自动移除httphandler
                        pipeline.addLast(new HttpClientCodec());
                        pipeline.addLast(new ChunkedWriteHandler());
                        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
                        //!!!ws://localhost:7070/helloWs 可以访问server
                        //!!!创建时会自动添加websocketHandShake handler,处理完后会自动移除
                        pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory
                                .newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));
                        pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
                                    throws Exception {
                                System.err.println(" 客户端收到消息======== " + msg.text());
                            }

                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE
                                        .equals(evt)) {
                                    System.out.println(ctx.name()+" 握手完成!");
                                    latch.countDown();
                                    send(ctx.channel());
                                }
                                super.userEventTriggered(ctx, evt);
                            }
                        });

                    }
                });

        ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();
  
        return cf.channel();
    }

    public static void send(Channel channel) {
        final String textMsg = "握手完成后直接发送的消息";

        if (channel != null && channel.isActive()) {
            TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);
            channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
                if (channelFuture.isDone() && channelFuture.isSuccess()) {
                    System.out.println("     ================= 发送成功.");
                } else {
                    channelFuture.channel().close();
                    System.out.println("     ================= 发送失败. cause = " + channelFuture.cause());
                    channelFuture.cause().printStackTrace();
                }
            });
        } else {
            System.out.println("消息发送失败! textMsg = " + textMsg);
        }
    }

}

2024/10/11 哎,今天没事干。。。不怎么想上班。。看看源码摸摸鱼算了

2:源码笔记

!!!注意:每个连接都会有一套pipeline,即pipeline是专属的,而不是共用的

server的websocket handler详解:

.handler(new ChannelInitializer<SocketChannel>() {
  @Override
  protected void initChannel(SocketChannel sc) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    //1:httpHandler,处理websocket的第一个http请求,处理完后会被删除         
    pipeline.addLast(new HttpServerCodec());                         #httpHandler
    pipeline.addLast(new HttpObjectAggregator(64 * 1024));           #httpHandler

    //2:处理websocket协议
    pipeline.addLast(new WebSocketServerProtocolHandler("/helloWs")) #websocket协议handler,做2件事
                                                                     #1:channel建立时自动添加2个websocket handler
                                                                     #2:这些添加的handler在处理完第一个http请求后会自动移除httpHandler

      WebSocketServerProtocolHandshakeHandler.handlerAdded           #handlerAdded:当channel建立后netty会给channel添加handler
                                                                     #当添加完以后就会调用handler的handlerAdded方法

        ChannelPipeline.addBefore(new WebSocketServerProtocolHandshakeHandler) ##添加websocket handleShake handler即握手处理器
                                                                     #当握手完成后即处理完第一个http请求后会移除自己和其他httpHandler
                                                                     #因为websocket只有第一个请求才是http请求

          WebSocketServerProtocolHandshakeHandler.channelRead        #WebSocketServerProtocolHandshakeHandler chanelRead只处理http请求
            if httpObject instanceof HttpRequest:                    #就是处理器第一个http请求,处理后再移除httphandler和自己
              handler=WebSocketServerHandshakerFactory.newHandshaker #如果第一个请求是http请求,那么就创建webSocket HandShaker
                WebSocketServerProtocolHandler.setHandshaker(handler)
                ctx.pipeline().remove(this)                          #这里就是WebSocketServerProtocolHandshakeHandler移除自己
                WebSocketServerHandshake.handsShake                  #处理握手请求
                  ChannelPipeline.remove(HttpObjectAggregator.class) #移除HttpObjectAggregator Handler
                  ChannelPipeline.remove(HttpContentCompressor.class)#移除htpContentCompressor handler

                  ChannelPipeline.addBefore(newWebSocketEncoder())   #添加websocket encoder
                  ChannelPipeline.addBefore(newWebsocketDecoder())   #添加websocket decoder
                                                                     #!!!缕一下:WebSocketServerProtocolHandler的handlerAdded
                                                                     #!!!在WebSocketServerProtocolHandler之前添加一个握手handler,
                                                                     #!!!这个handler就是WebSocketServerProtocolHandshakeHandler
                                                                     #!!!而WebSocketServerProtocolHandshakeHandler处理完成后会重置
                                                                     #!!!WebSocketServerProtocolHandshakeHandler的handler 
                                                                     #!!!然后再移除自己,因为这是第一个http请求,处理完第一个http请求就不需要握手了
                                                                     #!!!handsshake负责具体的握手逻辑,握手逻辑包括两个操作:
                                                                     #!!!1:移除httphandler,因为后续就是websocket协议请求了,httphandler无法解析
                                                                     #!!!2:添加websocket 编解码器

                ctx.fireUserEventTriggered(HANDSHAKE_COMPLETE)       #处理完握手后发送websocket握手完成事件
                ctx.fireUserEventTriggered(HandshakeComplete)

        ChannelPipeline.addBefore(new Utf8FrameValidator)            #添加websocket验证handler
    //3:处理webscoket请求                                          
    pipeline.addLast(new MyWebSocketHandler<WebSocketFrame>())       #处理webSocket请求
      if (frame instanceof TextWebSocketFrame) {                     #捋一下:如果不是websocket请求,那么这里就会跳过
                                                                     #如果websocket握手阶段已完成,那么在此之前就会有一个websocket decoder
                                                                     #websocket decoder会输出TextWebSocketFrame对象
                                                                     #我们的handler检测到TextWebSocketFrame对象,就会进行处理了                      
           ..处理websocket请求...                                     #至此websocket流程就讲完了,client端也是一样的逻辑:
                                                                     #client端也是一样的逻辑:1:握手;2:握手完成后删除httpHandler
      }


client的websocket详解:

    pipeline.addLast(new HttpClientCodec());
    pipeline.addLast(new ChunkedWriteHandler());
    pipeline.addLast(new HttpObjectAggregator(64 * 1024));
    pipeline.addLast(new WebSocketClientProtocolHandler("/helloWs"))        #一样的逻辑:添加handshake处理握手,
                                                                            #处理完后删除httphandler和自己,然后添加编解码器
      WebSocketClientProtocolHandler.handlerAdded
        ChannelPipeline.addBefore(WebSocketClientProtocolHandshakeHandler)  #添加握手handler处理第一个http握手请求
          WebSocketClientProtocolHandshakeHandler.channelActive             #连接建立后会发起握手,握手完成后会添加websocket 编码器
            WebSocketClientHandsShake.handshake                             #
              FullHttpRequest request = newHandshakeRequest()               #创建http 握手请求
              channel.writeAndFlush(request)                                #发起握手
              addLister.operationComplete                                   #添加listner,当握手完成后会执行
                ChannelPipeline.addAfter(WebSocketEncoder())                #添加websocket编码器
              addLister.operationComplete 
            ctx.fireUserEventTriggered(HANDSHAKE_ISSUED)                    #握手完成后发送握手完成事件
          WebSocketClientProtocolHandshakeHandler.channelRead
            if !(msg instanceof FullHttpResponse):                          #只处理第一个http请求,如果不是http请求,则跳过
              ctx.fireChannelRead(msg)
              return
            WebSocketClientHandshake.finishHandshake                        #完成握手
              ...校验http请求是不是server发来的websocket响应,略...  
              ChannelPipeline.remove(HttpContentDecompressor.class)         #握手完成后移除httphandler,因为后续就是websocket请求了
              ChannelPipeline.remove(HttpObjectAggregator.class)
              ChannelPipeline.addAfter(WebsocketDecoder())                  #添加websocket解码器
        ChannelPipeline.addBefore(new Utf8FrameValidator)                   #添加websocket验证handler
    pipeline.addLast(new MyWebSocketHandler<WebSocketFrame>())
  

http://www.kler.cn/a/470545.html

相关文章:

  • 嵌入式ARM平台 openwrt系统下 基于FFmpeg 的视频采集及推流 实践
  • 嵌入式驱动开发详解10(MISC杂项实现)
  • 阿尔法linux开发板ping不通百度
  • EntityFramework Core 数据种子
  • 计算机网络 (22)网际协议IP
  • react 封装一个类函数使用方法
  • 用CRD定义未来:解锁机器学习平台的无限可能
  • ollama+FastAPI部署后端大模型调用接口
  • 修改 页面 滚动条样式
  • 【React】漫游式引导
  • java开发springoot
  • 【苏德矿高等数学】第1讲:有界函数、无界函数、复合函数
  • DeepSpeed是什么,怎样使用
  • 个性化电影推荐系统|Java|SSM|JSP|
  • 【形式篇】年终总结怎么写:PPT如何将内容更好地表现出来
  • 5.C语言流程控制语句详解:if、switch、while、for、break、continue等
  • Vue Router v3.x 迁移到 v4.x(两者的区别)【路由篇】
  • 【模电】功率放大电路总结
  • 【C++数据结构——查找】二分查找(头歌实践教学平台习题)【合集】
  • 【渗透测试术语总结】
  • Zero to JupyterHub with Kubernetes 下篇 - Jupyterhub on k8s
  • 人工智能的发展领域之GPU加速计算的应用概述、架构介绍与教学过程
  • 【H3CNE邓方鸣】路由协议概述+2025.1.5
  • SQLite 的未来发展与展望
  • 【vue3封装element-plus的反馈组件el-drawer、el-dialog】
  • 解决 IntelliJ IDEA 中 Tomcat 日志乱码问题的详细指南