SpringBoot集成Netty实现Ws和Tcp通信
本教程将指导你如何在 Spring Boot 项目中集成 Netty,实现 WebSocket 和 TCP 通信。以下是详细的步骤和代码示例。
环境准备
在 你的pom.xml 中添加 Netty 依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
Ws通信具体模块
1.初始服务端代码
import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class Init implements ApplicationRunner {
public static void serverStart(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap
.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new IdleStateHandler(12,12,12, TimeUnit.DAYS));
pipeline.addLast(new HttpObjectAggregator(1024*64));
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
pipeline.addLast(new MsgHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
if (channelFuture1.isSuccess()) {
log.info("Websocket启动成功,端口:{}", port);
}else {
log.warn("Websocket启动失败,端口:{}", port);
}
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@Override
public void run(ApplicationArguments args) {
serverStart(7309);
}
}
2.信息处理器
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class MsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();
private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx){
channelGroups.add(ctx.channel());
SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
ChannelId id = ctx.channel().id();
CHANNEL.put(id, ctx.channel());
log.info("客服端:{} 上线了!",id);
ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
cause.printStackTrace();
ChannelId id = ctx.channel().id();
CHANNEL.remove(id);
channelGroups.remove(ctx.channel());
log.info("客服端:{} 异常断开!",id);
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx){
channelGroups.remove(ctx.channel());
log.info("客服端:{} 断开连接!",ctx.channel().id());
CHANNEL.remove(ctx.channel().id());
ctx.close();
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
if (!CHANNEL.containsKey(ctx.channel().id())) { CHANNEL.put(ctx.channel().id(), ctx.channel());}
String msg = textWebSocketFrame.text();
log.info("客服端:{} 发送消息:{}", ctx.channel().id(), msg );
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端收到您发送的信息:" + msg));
}
}
3.测试用例

WebSocket测试网站http://wstool.js.org/
Tcp通信具体模块
1.初始服务端代码
import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Init implements ApplicationRunner {
public static void serverStart(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap
.group(bossGroup,workerGroup)
// 添加通道设置非阻塞
.channel(NioServerSocketChannel.class)
// 服务端可连接队列数量
.option(ChannelOption.SO_BACKLOG, 128)
// 开启长连接
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
// 流程处理
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new MsgHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
if (channelFuture1.isSuccess()) {
log.info("TcpServer启动成功,端口:{}", port);
}else {
log.error("TcpServer启动失败,端口:{}", port);
}
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@Override
public void run(ApplicationArguments args) {
serverStart(7311);
}
}
2.信息处理器代码
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class MsgHandler extends ChannelInboundHandlerAdapter {
public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();
private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
channelGroups.add(ctx.channel());
SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));
ChannelId id = ctx.channel().id();
CHANNEL.put(id, ctx.channel());
log.info("客服端:{} 上线了!",id);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
channelGroups.remove(ctx.channel());
log.info("客服端:{} 异常!",ctx.channel().id());
CHANNEL.remove(ctx.channel().id());
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
channelGroups.remove(ctx.channel());
log.info("客服端:{} 断开连接!",ctx.channel().id());
CHANNEL.remove(ctx.channel().id());
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf byteBuf) {
// 将 ByteBuf 转换为字符串
String message = byteBuf.toString(CharsetUtil.UTF_8);
log.info("客服端:{} 发送消息:{}", ctx.channel().id(), message);
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("服务端收到您发送的信息:" + message, CharsetUtil.UTF_8));
} else {
log.info("客服端:{} 发送未知类型的消息:{}", ctx.channel().id(), msg);
}
}
}