Netty基础—7.Netty实现消息推送服务一
大纲
1.Netty实现HTTP服务器
2.Netty实现WebSocket
3.Netty实现的消息推送系统
(1)基于WebSocket的消息推送系统说明
(2)消息推送系统的PushServer
(3)消息推送系统的连接管理封装
(4)消息推送系统的ping-pong探测
(5)消息推送系统的全连接推送
(6)消息推送系统的HTTP响应和握手
(7)消息推送系统的运营客户端
(8)运营客户端连接PushServer
(9)运营客户端的Handler处理器
(10)运营客户端发送推送消息
(11)浏览器客户端接收推送消息
1.Netty实现HTTP服务器
(1)HTTP请求消息和响应消息
(2)Netty实现的HTTP协议栈的优势
(3)Netty实现的HTTP服务器
(4)请求的解析处理和响应的编码处理
(1)HTTP请求消息和响应消息
HTTP请求消息由三部分组成:请求行、请求头、请求体。HTTP响应消息也由三部分组成:响应行、响应头、响应体。
(2)Netty实现的HTTP协议栈的优势
Netty实现的HTTP协议栈无论在性能还是在可靠性上,都表现优异,非常适合在非Web容器下的场景使用。相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。
(3)Netty实现的HTTP服务器
public class NettyHttpServer {
private static final Logger logger = LogManager.getLogger(NettyHttpServer.class);
private static final int DEFAULT_PORT = 8998;
private int port;
public NettyHttpServer(int port) {
this.port = port;
}
public void start() throws Exception {
logger.info("Netty Http Server is starting.");
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//数据进来是自上而下,数据回去时自下而上
.addLast("http-decoder", new HttpRequestDecoder())
.addLast("http-aggregator", new HttpObjectAggregator(65536))
.addLast("http-encoder", new HttpResponseEncoder())
.addLast("http-chunked", new ChunkedWriteHandler())
.addLast("netty-http-server-handler", new NettyHttpServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
logger.info("Netty Http Server is started, listened[" + port + "].");
channelFuture.channel().closeFuture().sync();
} finally {
bossEventLoopGroup.shutdownGracefully();
workerEventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyHttpServer nettyHttpServer = new NettyHttpServer(DEFAULT_PORT);
nettyHttpServer.start();
}
}
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LogManager.getLogger(NettyHttpServerHandler.class);
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
logger.info(request);
}
}
启动NettyHttpServer,然后在浏览器进行访问,就可以看到输出如下:
Netty Http Server is starting.
Netty Http Server is started, listened[8998].
HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
GET / HTTP/1.1
Host: localhost:8998
Connection: keep-alive
sec-ch-ua: ".Not/A)Brand";v="99", "Google Chrome";v="103", "Chromium";v="103"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "macOS"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,id;q=0.7,ar;q=0.6
Cookie: _ga=GA1.1.629604539.1641093986
content-length: 0
(4)请求的解析处理和响应的编码处理
HTTP服务器最关键的就是请求的解析处理和响应的编码处理,比如会向ChannelPipeline中先后添加不同的解码器和编码器。
首先添加HTTP请求消息解码器HttpRequestDecoder,因为浏览器会把按照HTTP协议组织起来的请求数据序列化成字节数组发送给服务器,而HttpRequestDecoder可以按照HTTP协议从接收到的字节数组中读取出一个完整的请求数据。
然后添加HttpObjectAggregator解码器,它的作用是将多个消息转换为单一的FullHttpRequest或者FullHttpResponse。
接着添加HTTP响应消息编码器HttpResponseEncoder,它的作用是对HTTP响应消息进行编码。
以及添加ChunkedWriteHandler处理器,用来支持异步发送大的码流时也不会占用过多的内存,防止内存溢出。
最后添加NettyHttpServerHandler处理器,用于处理HTTP服务器的响应输出。
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LogManager.getLogger(NettyHttpServerHandler.class);
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
String method = request.getMethod().name();
String uri = request.getUri();
logger.info("Receives Http Request: " + method + " " + uri + ".");
String html = "<html><body>Hello, I am Netty Http Server.</body></html>";
ByteBuf byteBuf = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set("content-type", "text/html;charset=UTF-8");
response.content().writeBytes(byteBuf);
byteBuf.release();
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
2.Netty实现WebSocket
(1)HTTP协议的弊端
(2)消息推送之Ajax短轮询
(3)消息推送之WebSocket
(4)WebSocket连接的建立
(5)基于WebSocket协议开发NettyServer
(6)WebSocketServer的请求数据处理逻辑开发
(7)WebSocketServer的HTTP与chunk处理分析
(8)WebSocket网页客户端代码开发与实现
(1)HTTP协议的弊端
一.HTTP协议为半双工通信
半双工通信指数据可以在客户端和服务端两个方向传输,但是不能同时传输。它意味着同一时刻,只能有一个方向上的数据在进行传输。
二.HTTP消息冗长而繁琐
HTTP消息包含消息头、消息体、换行符等,通常情况下采用文本方式传输。相比于其他的二进制通信协议,冗长而繁琐。
(2)消息推送之Ajax短轮询
Ajax短轮询是基于HTTP短连接的。具体就是用一个定时器由浏览器对服务器发出HTTP请求,由服务器返回数据给客户端浏览器。
Ajax短轮询的代码实现简单。但由于HTTP请求的Header非常冗长,里面可用数据的比例非常低,所以比较占用带宽和服务器资源,且数据同步不及时。
(3)消息推送之WebSocket
WebSocket本质是一个TCP连接,采用的是全双工通信。
WebSocket中,浏览器和服务器只需要做一个握手动作,两者之间就会形成一条快速通道进行直接数据传输。由于WebSocket基于TCP双向全双工进行消息传递,所以在同一时刻既可以发送消息,也可以接收消息,比HTTP半双工性能好。
WebSocket通过ping/pong帧来保持链路激活,实时性很高,但是浏览器支持度低、代码实现复杂。
(4)WebSocket连接的建立
建立WebSocket连接时,需要通过客户端或者浏览器发出握手请求,这个握手请求是一个HTTP请求。该HTTP请求包含了附加头信息"Upgrade:WebSocket",表明这是一个申请协议升级的HTTP请求。
服务端解析这些附加的头信息,然后生成应答信息返回给客户端。这样客户端和服务端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由地传递信息,并且这个连接会持续存在直到一方主动关闭连接。
(5)基于WebSocket协议开发NettyServer
可以基于TCP协议用Netty来开发客户端和服务端进行相互通信,但粘包半包问题需要手动进行处理。
可以基于HTTP协议用Netty来开发一个HTTP服务器,服务器接收浏览器发送过来的HTTP请求后,返回HTTP响应回浏览器。
也可以基于WebSocket协议来开发一个Netty服务器,此时前端HTML代码会基于socket协议和Netty服务器建立长连接。这样Netty服务器就可以和浏览器里运行的HTML通过WebSocket协议建立长连接,从而使得Netty服务器可以主动推送数据给浏览器里的HTML页面。
WebSocket协议底层也是基于TCP协议来实现的,只不过是在TCP协议的基础上封装了一层更高层次的WebSocket协议。
public class NettyWebSocketServer {
private static final Logger logger = LogManager.getLogger(NettyWebSocketServer.class);
private static final int DEFAULT_PORT = 8998;
private int port;
public NettyWebSocketServer(int port) {
this.port = port;
}
public void start() throws Exception {
logger.info("Netty WebSocket Server is starting.");
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(1024 * 32))
.addLast(new WebSocketServerProtocolHandler("/websocket"))
.addLast("netty-web-socket-server-handler", new NettyWebSocketServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
logger.info("Netty WebSocket Server server is started, listened[" + port + "].");
channelFuture.channel().closeFuture().sync();
} finally {
bossEventLoopGroup.shutdownGracefully();
workerEventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyWebSocketServer nettyHttpServer = new NettyWebSocketServer(DEFAULT_PORT);
nettyHttpServer.start();
}
}
(6)WebSocketServer的请求数据处理逻辑开发
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger logger = LogManager.getLogger(NettyWebSocketServerHandler.class);
private static ChannelGroup webSocketClients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//WebSocket网页代码里发送过来的数据
String request = msg.text();
logger.info("Netty Server receives request: " + request + ".");
TextWebSocketFrame response = new TextWebSocketFrame("Hello, I am Netty Server.");
ctx.writeAndFlush(response);
}
//如果一个网页WebSocket客户端跟Netty Server建立了连接之后,会触发这个方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
webSocketClients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
logger.info("websocket client is closed, channel id: " + ctx.channel().id().asLongText() + "[" + ctx.channel().id().asShortText() + "]");
}
}
(7)WebSocketServer的HTTP与chunk处理分析
浏览器里面运行的是HTML网页代码,WebSocket就是在HTML网页代码里嵌入WebSocket代码,可以让浏览器里的HTML网页代码跟NettyServer建立连接,并且是基于长连接发送数据。
浏览器会按照WebSocket协议(底层基于HTTP协议)组织请求数据格式,然后把数据序列化成字节数组,接着通过网络连接把字节数组传输发送给NettyServer,NettyServer会通过数据处理链条来进行处理接收到的字节数组数据。
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//浏览器的字节数组数据进来以后,字节数组数据先用http协议来处理,把字节数组转换为一个HttpRequest请求对象
//最后数据返回给浏览器前,又会对HttpResponse对象进行编码成字节数组
.addLast(new HttpServerCodec())
//chunked write用于大量数据流时的分chunk块,也就是数据实在是太大了就必须得分chunk
//大量数据流进来时,可以分chunk块来读;大量数据流出去时,可以分chunk块来写;
.addLast(new ChunkedWriteHandler())
//如果想要让很多http不要拆分成很多段过来,可以把完整的请求数据聚合到一起再给过来
.addLast(new HttpObjectAggregator(1024 * 32))
//基于前面已经转换好的请求数据对象,会在这里基于WebSocket协议再次做一个处理
//由于传输时是基于http协议传输过来的,而封装的内容是按webSocket协议来封装的http请求数据
//所以必须在这里提取http请求里面的数据,然后按照WebSocket协议来进行解析处理,把数据提取出来作为WebSocket的数据片段
.addLast(new WebSocketServerProtocolHandler("/websocket"))
//响应数据输出时,顺序是反的,第一步原始数据必须先经过WebSocket协议转换
//WebSocket协议数据,必须经过HTTP协议处理,但最终会encode编码成一个HTTP协议的响应数据
//然后服务端将HTTP响应数据序列化的字节数组,传输给浏览器
//浏览器拿到字节数组后进行反序列化,拿到一个HTTP协议响应数据,提取出内容再按照WebSocket协议来处理
//最终把普通的数据给WebSocket代码
.addLast("netty-web-socket-server-handler", new NettyWebSocketServerHandler());
}
(8)WebSocket网页客户端代码开发与实现
<!DOCTYPE html>
<html lang="en">
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>websocket网页</title>
</head>
<body onload="connectServer();">
<script type="text/javascript">
var websocket;
function connectServer() {
if ("WebSocket" in window) {
console.log("Your browser supports websocket!");
websocket = new WebSocket("ws://localhost:8998/websocket");
websocket.onopen = function() {
console.log("Send request to netty server.");
websocket.send("I am websocket client.");
}
websocket.onmessage = function(ev) {
var response = ev.data;
console.log("Receive response from netty server: " + response);
}
}
}
</script>
</body>
</html>
启动Netty Server,然后打开该HTML即可看到console的输出。
WebSocket和Netty Server配合起来开发说明:
如果Server端要主动推送一些通知(push)给网页端正在浏览网页的用户,如推送用户可能感兴趣的商品、关注的新闻。那么在用户进入网页后可以询问用户,是否愿意收到服务端发送的xx提示和通知。如果用户愿意,那么网页里的WebSocket完全可以跟NettyServer构建一个长连接。这样NettyServer在必要时,就可以反向推送通知(push)给用户,浏览器里的网页可能会弹出一个push通知用户xx讯息。