Spingboot整合Netty,简单示例
Netty介绍在文章末尾 Netty介绍
项目背景
传统socket通信,有需要自身管理整个状态,业务繁杂等问题。
pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.117.Final</version>
</dependency>
代码
封装Netty的服务类
package com.example.server;
import com.example.exception.ServiceException;
import com.example.handler.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer {
// 用于处理连接请求的线程组
private EventLoopGroup bossGroup;
// 用于处理I/O操作的线程组
private EventLoopGroup workerGroup;
// 服务器
private ChannelFuture channelFuture;
/**
* 启动 Netty 服务端
*
* @param port 监听的端口
*/
public void start(int port) {
bossGroup = new NioEventLoopGroup(); // 初始化bossGroup
workerGroup = new NioEventLoopGroup(); // 初始化workerGroup
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用NIO传输
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 添加编解码器和处理器
ch.pipeline().addLast(new NettyServerInitializer());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 设置TCP参数
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口并启动服务器
channelFuture = bootstrap.bind(port).sync();
log.info("Netty 服务端启动成功,监听端口: {}", port);
} catch (InterruptedException e) {
log.error("Netty 服务端启动失败,端口: {}, 异常: {}", port, e.getMessage(), e);
throw new ServiceException("Netty 服务端启动异常");
} catch (Exception e) {
log.error("绑定端口时发生异常: {}", e.getMessage(), e);
throw new ServiceException("端口绑定异常");
}
}
/**
* 停止 Netty 服务端
*/
public void stop() {
if (channelFuture != null) {
channelFuture.channel().close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
System.out.println("Netty 服务端已关闭");
}
}
封装Netty服务的管理类
用来创建和销毁多个Netty
package com.example.server;
import com.example.exception.ServiceException;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class NettyServerManager {
private final ConcurrentHashMap<Integer, NettyServer> nettyServers = new ConcurrentHashMap<>();
/**
* 启动 Netty 服务端
*
* @param port 监听的端口
*/
public void startNettyServer(int port) {
if (nettyServers.containsKey(port)) {
throw new ServiceException("端口 " + port + " 已被占用");
}
NettyServer nettyServer = new NettyServer();
nettyServer.start(port);
nettyServers.put(port, nettyServer);
}
/**
* 停止 Netty 服务端
*
* @param port 监听的端口
*/
public void stopNettyServer(int port) {
NettyServer nettyServer = nettyServers.get(port);
if (nettyServer != null) {
nettyServer.stop();
nettyServers.remove(port);
}
}
/**
* 获取所有 Netty 服务端实例
*/
public Map<Integer, NettyServer> getNettyServers() {
return nettyServers;
}
}
NettyServer的初始化
package com.example.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
// HTTP 编解码器:支持 HTTP 请求和响应
.addLast(new HttpServerCodec())
// 聚合 HTTP 消息,避免处理分块的 HTTP 请求
.addLast(new HttpObjectAggregator(65536))
// WebSocket 协议处理器,确保路径 `/websocket` 处理 WebSocket 握手和控制帧
.addLast(new WebSocketServerProtocolHandler("/websocket"))
// 字符串解码器,将字节流解析为字符串
.addLast(new StringDecoder(CharsetUtil.UTF_8))
// 字符串编码器,将字符串转换为字节流
.addLast(new StringEncoder(CharsetUtil.UTF_8))
// 自定义消息处理器,用于处理业务逻辑
.addLast(new WebSocketFrameHandler());
}
}
handler处理器类
用来处理具体的消息
package com.example.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WebSocketFrameHandler extends ChannelInboundHandlerAdapter {
@Resource
private UserService userService;
@PostConstruct
public void init() {
webSocketFrameHandler = this;
}
/**
* 当 Channel 变为活跃状态(连接建立)时调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.warn("{} 已连接", ctx.channel().remoteAddress());
}
/**
* 当 Channel 变为不活跃状态(连接关闭)时调用
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.warn("{} 已断开", ctx.channel().remoteAddress());
}
/**
* 当从 Channel 中读取到数据时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
/**
* 当一次数据读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// log.info("Channel 数据读取完成");
ctx.flush();
}
/**
* 当处理过程中发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("处理过程中发生异常,连接关闭: {}", cause.getMessage(), cause);
ctx.close();
}
}
消息传递实体
这里分享一个我用来消息传递的实体,这个实体需要被序列化传递。因为Websocket传递都是文本的形式
Netty介绍
Netty 是一个异步事件驱动的网络应用框架,主要用于快速开发高性能、高可靠性的网络服务器和客户端。它简化了TCP、UDP等协议的编程,广泛应用于WebSocket等。
特性
异步非阻塞:基于事件驱动模型,支持高并发连接。
高性能:优化了网络通信,资源利用率高。
核心组件
- Channel:网络通信的管道,支持多种I/O操作。
- EventLoop:事件循环,处理I/O事件。
- ChannelHandler:处理I/O事件或拦截操作。
- ChannelPipeline:包含多个ChannelHandler,处理入站和出站事件。
- ByteBuf:高效的字节容器,优于JDK的ByteBuffer