当前位置: 首页 > article >正文

SpringBoot集成Netty实现Ws和Tcp通信

        本教程将指导你如何在 Spring Boot 项目中集成 Netty,实现 WebSocket 和 TCP 通信。以下是详细的步骤和代码示例。  

环境准备

在 你的pom.xml 中添加 Netty 依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

Ws通信具体模块

1.初始服务端代码

import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class Init implements ApplicationRunner {

        public static void serverStart(int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            try {
                serverBootstrap
                        .group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast();
                                pipeline.addLast(new HttpServerCodec());
                                pipeline.addLast(new ChunkedWriteHandler());
                                pipeline.addLast(new IdleStateHandler(12,12,12, TimeUnit.DAYS));
                                pipeline.addLast(new HttpObjectAggregator(1024*64));
                                pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                                pipeline.addLast(new MsgHandler());
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
                    if (channelFuture1.isSuccess()) {
                        log.info("Websocket启动成功,端口:{}", port);
                    }else {
                        log.warn("Websocket启动失败,端口:{}", port);
                    }
                });
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }finally {
                bossGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }

    @Override
    public void run(ApplicationArguments args)  {
        serverStart(7309);
    }
}

2.信息处理器

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class MsgHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();
    private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx){
        channelGroups.add(ctx.channel());
        SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
        ChannelId id = ctx.channel().id();
        CHANNEL.put(id, ctx.channel());
        log.info("客服端:{} 上线了!",id);
        ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        cause.printStackTrace();
        ChannelId id = ctx.channel().id();
        CHANNEL.remove(id);
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 异常断开!",id);
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx){
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 断开连接!",ctx.channel().id());
        CHANNEL.remove(ctx.channel().id());
        ctx.close();
    }


    @Override
    protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        if (!CHANNEL.containsKey(ctx.channel().id())) { CHANNEL.put(ctx.channel().id(), ctx.channel());}
        String msg = textWebSocketFrame.text();
        log.info("客服端:{} 发送消息:{}", ctx.channel().id(), msg );
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端收到您发送的信息:" + msg));
    }
}

3.测试用例 

测试案例
Ws测试用例

WebSocket测试网站http://wstool.js.org/

Tcp通信具体模块

1.初始服务端代码



import com.leyting.handler.MsgHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Init implements ApplicationRunner {

    public static void serverStart(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap
                    .group(bossGroup,workerGroup)
                    // 添加通道设置非阻塞
                    .channel(NioServerSocketChannel.class)
                    // 服务端可连接队列数量
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 开启长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    // 流程处理
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new MsgHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
                if (channelFuture1.isSuccess()) {
                    log.info("TcpServer启动成功,端口:{}", port);
                }else {
                    log.error("TcpServer启动失败,端口:{}", port);
                }
            });
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }finally {
            bossGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    @Override
    public void run(ApplicationArguments args)  {
        serverStart(7311);
    }
}

2.信息处理器代码

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class MsgHandler extends ChannelInboundHandlerAdapter  {

    public final static Map<ChannelId, Channel> CHANNEL = new ConcurrentHashMap<>();
    private final static ChannelGroup channelGroups = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx)  {
        channelGroups.add(ctx.channel());
        SimpleDateFormat format = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
        ctx.channel().writeAndFlush(new TextWebSocketFrame(format.format(new Date()) + " 欢迎你的上线"));
        ChannelId id = ctx.channel().id();
        CHANNEL.put(id, ctx.channel());
        log.info("客服端:{} 上线了!",id);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 异常!",ctx.channel().id());
        CHANNEL.remove(ctx.channel().id());
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx)  {
        channelGroups.remove(ctx.channel());
        log.info("客服端:{} 断开连接!",ctx.channel().id());
        CHANNEL.remove(ctx.channel().id());
        ctx.close();

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof ByteBuf byteBuf) {
            // 将 ByteBuf 转换为字符串
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            log.info("客服端:{} 发送消息:{}", ctx.channel().id(), message);
            ctx.channel().writeAndFlush(Unpooled.copiedBuffer("服务端收到您发送的信息:" + message, CharsetUtil.UTF_8));
        } else {
            log.info("客服端:{} 发送未知类型的消息:{}", ctx.channel().id(), msg);
        }
    }


}


http://www.kler.cn/a/570849.html

相关文章:

  • 【mysql】有索引和没有索引字段更新时锁的不同
  • C# OnnxRuntime部署DAMO-YOLO香烟检测
  • Spring Boot 自动装配深度解析与实践指南
  • React 源码揭秘 | bailout策略Memo
  • 力扣每日一题——分割回文串
  • Skyeye 云智能制造办公系统 VUE 版本 v3.15.11 发布
  • 迷你世界脚本实体接口:Actor
  • Unity 接入本地部署的DeepSeek
  • pytest的bug
  • (十 九)趣学设计模式 之 中介者模式!
  • Leetcode 54: 螺旋矩阵
  • 大白话实战docker
  • 计算机基础面试(数据库)
  • 基于springboot+vue3图书借阅管理系统
  • Linux网络相关概念和重要知识(1)(网络协议、网络通信)
  • SEKI —— 基于大型语言模型的自进化与知识启发式神经架构搜索
  • SSM家谱管理系统
  • 蓝桥杯备考:动态规划入门题目之下楼梯问题
  • 华硕电脑开启电池保养模式的方法
  • 用Python+Flask打造可视化武侠人物关系图生成器:从零到一的实战全记录