netty系列(四)websocket client和server
(今天就把netty笔记搬上来和更新一下,没有去整理了)
netty版本4.1.99.final
本文主要记录了netty中websocket handler的源码处理流程
1:先简要概括下netty使用websocket流程
websocket请求流程:client先发送一个http请求表示想要建立websocket连接,然后服务端解析http请求,发现这是个http请求websocket的握手请求,所以建立websocket连接,当处理完第一个http请求后,后续就是websocket请求了,所以服务端在处理完第一个http请求后就要移除pipeline中的http相关handler,同样,client在收到请求完成以后也要删除http相关handler,因为后续的协议是websocket而不是http,而httphandler是无法解析websocket协议请求的,所以要移除
2:测试代码-server和client
server:
package org.example;
public class Server {
public static void main(String[] args) throws Exception {
new Server().start(7070);
}
public void start(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//!!!需要添加httphandler,不用担心,websocket处理完第一个请求后会自动移除httphandler
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//!!!ws://localhost:7070/helloWs 可以访问server
//!!!创建时会自动添加websocketHandShake handler,处理完后会自动移除
pipeline.addLast(new WebSocketServerProtocolHandler("/helloWs"));
pipeline.addLast(new SimpleChannelInboundHandler<WebSocketFrame>() {
@Override
protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
// 处理文本消息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("收到客户端消息: " + request);
// 回复客户端消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息: " + request));
} else {
System.out.println("收到非文本消息,不支持处理");
}
}
@Override
public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接: " + ctx.channel().remoteAddress());
}
@Override
public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开连接: " + ctx.channel().remoteAddress());
}
});
}
});
ChannelFuture f = b.bind(port).sync();
System.out.println("服务器启动,监听端口: " + port);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
client:
package org.example;
public class Client {
final CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Client client = new Client();
client.test();
}
public void test() throws Exception {
Channel dest = dest();
latch.await();
dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));
}
public Channel dest() throws Exception {
final URI webSocketURL = new URI("ws://127.0.0.1:7070/helloWs");
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
//!!!需要添加httphandler,不用担心,websocket处理完第一个请求后会自动移除httphandler
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//!!!ws://localhost:7070/helloWs 可以访问server
//!!!创建时会自动添加websocketHandShake handler,处理完后会自动移除
pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory
.newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));
pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
System.err.println(" 客户端收到消息======== " + msg.text());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE
.equals(evt)) {
System.out.println(ctx.name()+" 握手完成!");
latch.countDown();
send(ctx.channel());
}
super.userEventTriggered(ctx, evt);
}
});
}
});
ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();
return cf.channel();
}
public static void send(Channel channel) {
final String textMsg = "握手完成后直接发送的消息";
if (channel != null && channel.isActive()) {
TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);
channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isDone() && channelFuture.isSuccess()) {
System.out.println(" ================= 发送成功.");
} else {
channelFuture.channel().close();
System.out.println(" ================= 发送失败. cause = " + channelFuture.cause());
channelFuture.cause().printStackTrace();
}
});
} else {
System.out.println("消息发送失败! textMsg = " + textMsg);
}
}
}
2024/10/11 哎,今天没事干。。。不怎么想上班。。看看源码摸摸鱼算了
2:源码笔记
!!!注意:每个连接都会有一套pipeline,即pipeline是专属的,而不是共用的
server的websocket handler详解:
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//1:httpHandler,处理websocket的第一个http请求,处理完后会被删除
pipeline.addLast(new HttpServerCodec()); #httpHandler
pipeline.addLast(new HttpObjectAggregator(64 * 1024)); #httpHandler
//2:处理websocket协议
pipeline.addLast(new WebSocketServerProtocolHandler("/helloWs")) #websocket协议handler,做2件事
#1:channel建立时自动添加2个websocket handler
#2:这些添加的handler在处理完第一个http请求后会自动移除httpHandler
WebSocketServerProtocolHandshakeHandler.handlerAdded #handlerAdded:当channel建立后netty会给channel添加handler
#当添加完以后就会调用handler的handlerAdded方法
ChannelPipeline.addBefore(new WebSocketServerProtocolHandshakeHandler) ##添加websocket handleShake handler即握手处理器
#当握手完成后即处理完第一个http请求后会移除自己和其他httpHandler
#因为websocket只有第一个请求才是http请求
WebSocketServerProtocolHandshakeHandler.channelRead #WebSocketServerProtocolHandshakeHandler chanelRead只处理http请求
if httpObject instanceof HttpRequest: #就是处理器第一个http请求,处理后再移除httphandler和自己
handler=WebSocketServerHandshakerFactory.newHandshaker #如果第一个请求是http请求,那么就创建webSocket HandShaker
WebSocketServerProtocolHandler.setHandshaker(handler)
ctx.pipeline().remove(this) #这里就是WebSocketServerProtocolHandshakeHandler移除自己
WebSocketServerHandshake.handsShake #处理握手请求
ChannelPipeline.remove(HttpObjectAggregator.class) #移除HttpObjectAggregator Handler
ChannelPipeline.remove(HttpContentCompressor.class)#移除htpContentCompressor handler
ChannelPipeline.addBefore(newWebSocketEncoder()) #添加websocket encoder
ChannelPipeline.addBefore(newWebsocketDecoder()) #添加websocket decoder
#!!!缕一下:WebSocketServerProtocolHandler的handlerAdded
#!!!在WebSocketServerProtocolHandler之前添加一个握手handler,
#!!!这个handler就是WebSocketServerProtocolHandshakeHandler
#!!!而WebSocketServerProtocolHandshakeHandler处理完成后会重置
#!!!WebSocketServerProtocolHandshakeHandler的handler
#!!!然后再移除自己,因为这是第一个http请求,处理完第一个http请求就不需要握手了
#!!!handsshake负责具体的握手逻辑,握手逻辑包括两个操作:
#!!!1:移除httphandler,因为后续就是websocket协议请求了,httphandler无法解析
#!!!2:添加websocket 编解码器
ctx.fireUserEventTriggered(HANDSHAKE_COMPLETE) #处理完握手后发送websocket握手完成事件
ctx.fireUserEventTriggered(HandshakeComplete)
ChannelPipeline.addBefore(new Utf8FrameValidator) #添加websocket验证handler
//3:处理webscoket请求
pipeline.addLast(new MyWebSocketHandler<WebSocketFrame>()) #处理webSocket请求
if (frame instanceof TextWebSocketFrame) { #捋一下:如果不是websocket请求,那么这里就会跳过
#如果websocket握手阶段已完成,那么在此之前就会有一个websocket decoder
#websocket decoder会输出TextWebSocketFrame对象
#我们的handler检测到TextWebSocketFrame对象,就会进行处理了
..处理websocket请求... #至此websocket流程就讲完了,client端也是一样的逻辑:
#client端也是一样的逻辑:1:握手;2:握手完成后删除httpHandler
}
client的websocket详解:
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new WebSocketClientProtocolHandler("/helloWs")) #一样的逻辑:添加handshake处理握手,
#处理完后删除httphandler和自己,然后添加编解码器
WebSocketClientProtocolHandler.handlerAdded
ChannelPipeline.addBefore(WebSocketClientProtocolHandshakeHandler) #添加握手handler处理第一个http握手请求
WebSocketClientProtocolHandshakeHandler.channelActive #连接建立后会发起握手,握手完成后会添加websocket 编码器
WebSocketClientHandsShake.handshake #
FullHttpRequest request = newHandshakeRequest() #创建http 握手请求
channel.writeAndFlush(request) #发起握手
addLister.operationComplete #添加listner,当握手完成后会执行
ChannelPipeline.addAfter(WebSocketEncoder()) #添加websocket编码器
addLister.operationComplete
ctx.fireUserEventTriggered(HANDSHAKE_ISSUED) #握手完成后发送握手完成事件
WebSocketClientProtocolHandshakeHandler.channelRead
if !(msg instanceof FullHttpResponse): #只处理第一个http请求,如果不是http请求,则跳过
ctx.fireChannelRead(msg)
return
WebSocketClientHandshake.finishHandshake #完成握手
...校验http请求是不是server发来的websocket响应,略...
ChannelPipeline.remove(HttpContentDecompressor.class) #握手完成后移除httphandler,因为后续就是websocket请求了
ChannelPipeline.remove(HttpObjectAggregator.class)
ChannelPipeline.addAfter(WebsocketDecoder()) #添加websocket解码器
ChannelPipeline.addBefore(new Utf8FrameValidator) #添加websocket验证handler
pipeline.addLast(new MyWebSocketHandler<WebSocketFrame>())