当前位置: 首页 > article >正文

Spingboot整合Netty,简单示例

Netty介绍在文章末尾  Netty介绍

项目背景

传统socket通信,有需要自身管理整个状态,业务繁杂等问题。

pom.xml

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.117.Final</version>
</dependency>

代码

封装Netty的服务类

package com.example.server;

import com.example.exception.ServiceException;
import com.example.handler.NettyServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {
	// 用于处理连接请求的线程组
	private EventLoopGroup bossGroup;
	// 用于处理I/O操作的线程组
	private EventLoopGroup workerGroup;
	// 服务器
	private ChannelFuture channelFuture;

	/**
	 * 启动 Netty 服务端
	 *
	 * @param port 监听的端口
	 */
	public void start(int port) {
		bossGroup = new NioEventLoopGroup(); // 初始化bossGroup
		workerGroup = new NioEventLoopGroup(); // 初始化workerGroup

		try {
			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class) // 使用NIO传输
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch) {
							// 添加编解码器和处理器
							ch.pipeline().addLast(new NettyServerInitializer());
						}
					})
					.option(ChannelOption.SO_BACKLOG, 128) // 设置TCP参数
					.childOption(ChannelOption.SO_KEEPALIVE, true);

			// 绑定端口并启动服务器
			channelFuture = bootstrap.bind(port).sync();
			log.info("Netty 服务端启动成功,监听端口: {}", port);
		} catch (InterruptedException e) {
			log.error("Netty 服务端启动失败,端口: {}, 异常: {}", port, e.getMessage(), e);
			throw new ServiceException("Netty 服务端启动异常");
		} catch (Exception e) {
			log.error("绑定端口时发生异常: {}", e.getMessage(), e);
			throw new ServiceException("端口绑定异常");
		}
	}

	/**
	 * 停止 Netty 服务端
	 */
	public void stop() {
		if (channelFuture != null) {
			channelFuture.channel().close();
		}
		if (bossGroup != null) {
			bossGroup.shutdownGracefully();
		}
		if (workerGroup != null) {
			workerGroup.shutdownGracefully();
		}
		System.out.println("Netty 服务端已关闭");
	}


}

封装Netty服务的管理类

用来创建和销毁多个Netty

package com.example.server;

import com.example.exception.ServiceException;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class NettyServerManager {
	private final ConcurrentHashMap<Integer, NettyServer> nettyServers = new ConcurrentHashMap<>();

	/**
	 * 启动 Netty 服务端
	 *
	 * @param port 监听的端口
	 */
	public void startNettyServer(int port) {
		if (nettyServers.containsKey(port)) {
			throw new ServiceException("端口 " + port + " 已被占用");
		}

		NettyServer nettyServer = new NettyServer();
		nettyServer.start(port);
		nettyServers.put(port, nettyServer);
	}

	/**
	 * 停止 Netty 服务端
	 *
	 * @param port 监听的端口
	 */
	public void stopNettyServer(int port) {
		NettyServer nettyServer = nettyServers.get(port);
		if (nettyServer != null) {
			nettyServer.stop();
			nettyServers.remove(port);
		}
	}

	/**
	 * 获取所有 Netty 服务端实例
	 */
	public Map<Integer, NettyServer> getNettyServers() {
		return nettyServers;
	}
}

NettyServer的初始化

package com.example.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
	@Override
	protected void initChannel(SocketChannel ch) {
		ch.pipeline()
				// HTTP 编解码器:支持 HTTP 请求和响应
				.addLast(new HttpServerCodec())
				// 聚合 HTTP 消息,避免处理分块的 HTTP 请求
				.addLast(new HttpObjectAggregator(65536))
				// WebSocket 协议处理器,确保路径 `/websocket` 处理 WebSocket 握手和控制帧
				.addLast(new WebSocketServerProtocolHandler("/websocket"))
				// 字符串解码器,将字节流解析为字符串
				.addLast(new StringDecoder(CharsetUtil.UTF_8))
				// 字符串编码器,将字符串转换为字节流
				.addLast(new StringEncoder(CharsetUtil.UTF_8))
				// 自定义消息处理器,用于处理业务逻辑
				.addLast(new WebSocketFrameHandler());
	}
}

handler处理器类

用来处理具体的消息

package com.example.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WebSocketFrameHandler extends ChannelInboundHandlerAdapter {
	@Resource
	private UserService userService;


	@PostConstruct
	public void init() {
		webSocketFrameHandler = this;
	}


	/**
	 * 当 Channel 变为活跃状态(连接建立)时调用
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		log.warn("{} 已连接", ctx.channel().remoteAddress());
	}

	/**
	 * 当 Channel 变为不活跃状态(连接关闭)时调用
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) {
		log.warn("{} 已断开", ctx.channel().remoteAddress());
	}

	/**
	 * 当从 Channel 中读取到数据时调用
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	
	}

	/**
	 * 当一次数据读取完成时调用
	 */
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		// log.info("Channel 数据读取完成");
		ctx.flush();
	}

	/**
	 * 当处理过程中发生异常时调用
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		log.error("处理过程中发生异常,连接关闭: {}", cause.getMessage(), cause);
		ctx.close();
	}
}

消息传递实体

        这里分享一个我用来消息传递的实体,这个实体需要被序列化传递。因为Websocket传递都是文本的形式


Netty介绍

Netty 是一个异步事件驱动的网络应用框架,主要用于快速开发高性能、高可靠性的网络服务器和客户端。它简化了TCP、UDP等协议的编程,广泛应用于WebSocket等。

特性

异步非阻塞:基于事件驱动模型,支持高并发连接。

高性能:优化了网络通信,资源利用率高。

核心组件

  1. Channel:网络通信的管道,支持多种I/O操作。
  2. EventLoop:事件循环,处理I/O事件。
  3. ChannelHandler:处理I/O事件或拦截操作。
  4. ChannelPipeline:包含多个ChannelHandler,处理入站和出站事件。
  5. ByteBuf:高效的字节容器,优于JDK的ByteBuffer

http://www.kler.cn/a/513516.html

相关文章:

  • 细说STM32F407单片机电源低功耗StopMode模式及应用示例
  • Linux TCP 之 RTT 采集与 RTO 计算
  • 智能新浪潮:亚马逊云科技发布Amazon Nova模型
  • ElasticSearch DSL查询之排序和分页
  • elasticsearch 数据导出/导入
  • 数据结构-队列
  • HJ108 求最小公倍数(Java版本)
  • Nim游戏算法问题(Java)
  • 颜色分配问题
  • 深入理解 Java 的数据类型与运算符
  • Cannot resolve symbol ‘XXX‘ Maven 依赖问题的解决过程
  • 55.命名、驼峰式、帕斯卡式 C#例子
  • MySQL表创建分区键
  • 37.构造回文字符串问题|Marscode AI刷题
  • PHP语言的网络编程
  • 深度学习 · 手撕 DeepLearning4J ,用Java实现手写数字识别 (附UI效果展示)
  • 【BUUCTF】[RCTF2015]EasySQL1
  • AT9880U-B-F8N-23北斗多频导航芯片车规级数据手册
  • Docker入门学习
  • cf<contest/1950>练习-python版
  • Django学习笔记(安装和环境配置)-01
  • 元素周期表
  • jvm学习总结
  • Spark SQL中的from_json函数详解
  • mac 配置 python 环境变量
  • 2023年12月GESP C++ 六级认证真题——工作沟通