当前位置: 首页 > article >正文

Netty实现WebSocket Client三种典型方式

一、简单版本

package com.ptc.ai.box.biz.relay.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.URI;
import java.util.concurrent.CountDownLatch;

/**
 * https://blog.csdn.net/a1053765496/article/details/130701218
 * 基于Netty快速实现WebSocket客户端,不手动处理握手
 */
@Slf4j
public class SimpleWsClient {

    final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        SimpleWsClient client = new SimpleWsClient();
        client.test();
    }

    public void test() throws Exception {
        Channel dest = dest();
        latch.await();
        dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));
    }

    public Channel dest() throws Exception {
        final URI webSocketURL = new URI("wss://api.openai.com:443/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01");

        DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        httpHeaders.add("Authorization", "");
        httpHeaders.add("OpenAI-Beta", "realtime=v1");
        /*httpHeaders.add("Sec-WebSocket-Version", "13");
        httpHeaders.add("Upgrade", "websocket");
        httpHeaders.add("Connection", "Upgrade");*/

        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap boot = new Bootstrap();
        boot.option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .group(group)
                .handler(new LoggingHandler(LogLevel.INFO))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        ChannelPipeline pipeline = sc.pipeline();
                        pipeline.addLast(new HttpClientCodec());
                        pipeline.addLast(new ChunkedWriteHandler());
                        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
                        pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory
                                .newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, httpHeaders)));
                        pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
                                    throws Exception {
                                System.err.println(" 客户端收到消息======== " + msg.text());
                            }

                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE
                                        .equals(evt)) {
                                    log.info(ctx.channel().id().asShortText() + " 握手完成!");
                                    latch.countDown();
                                    send(ctx.channel());
                                }
                                super.userEventTriggered(ctx, evt);
                            }
                        });

                    }
                });

        ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();

        return cf.channel();
    }

    public static void send(Channel channel) {
        final String textMsg = "握手完成后直接发送的消息";

        if (channel != null && channel.isActive()) {
            TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);
            channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
                if (channelFuture.isDone() && channelFuture.isSuccess()) {
                    log.info("     ================= 发送成功.");
                } else {
                    channelFuture.channel().close();
                    log.info("     ================= 发送失败. cause = " + channelFuture.cause());
                    channelFuture.cause().printStackTrace();
                }
            });
        } else {
            log.error("消息发送失败! textMsg = " + textMsg);
        }
    }

}

二、结构化版本

package com.ptc.ai.box.biz.relay.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

/**
 * https://gitcode.com/ddean2009/learn-netty4/tree/master/src/main/java/com/flydean25/socketclient
 * https://www.flydean.com/25-netty-websocket-client
 * https://blog.csdn.net/superfjj/article/details/120648434
 * https://blog.csdn.net/twypx/article/details/84543518
 */
public final class NettyWsClient {

    static final String URL = System.getProperty("url", "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01");

    public static void main(String[] args) throws Exception {
        URI uri = new URI(URL);
        final int port = 443;

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
            httpHeaders.add("Authorization", "");
            httpHeaders.add("OpenAI-Beta", "realtime=v1");

            WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
                    .newHandshaker(uri, WebSocketVersion.V13, null, true, httpHeaders));

            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
                }
            });

            Channel ch = b.connect(uri.getHost(), port).sync().channel();
            handler.handshakeFuture().sync();

            BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String msg = console.readLine();
                if (msg == null) {
                    break;
                } else if ("再见".equalsIgnoreCase(msg)) {
                    ch.writeAndFlush(new CloseWebSocketFrame());
                    ch.closeFuture().sync();
                    break;
                } else if ("ping".equalsIgnoreCase(msg)) {
                    WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));
                    ch.writeAndFlush(frame);
                } else {
                    WebSocketFrame frame = new TextWebSocketFrame(msg);
                    ch.writeAndFlush(frame);
                }
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}
package com.ptc.ai.box.biz.relay.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private final WebSocketClientHandshaker handshaker;

    private ChannelPromise handshakeFuture;

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("channelActive, 进行handshake");
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("channelInactive!");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                log.info("websocket Handshake 完成!");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                log.info("websocket连接失败!");
                handshakeFuture.setFailure(e);
            }
            return;
        }
        log.info("channelRead0.......");

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
                    + response.content().toString(CharsetUtil.UTF_8) + ')');
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            log.info("接收到TXT消息: " + textFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            log.info("接收到pong消息");
        } else if (frame instanceof CloseWebSocketFrame) {
            log.info("接收到closing消息");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 异常处理
        log.error("出现异常", cause);
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

三、ChannelInitializer解耦合版本

package com.ptc.ai.box.biz.relay.client;

import com.ptc.ai.box.biz.relay.client.handler.TargetServerHandler;
import com.ptc.ai.box.biz.relay.client.handler.WebSocketClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;

import java.net.URI;

/**
 * Created by IntelliJ IDEA.
 *
 * @Author :
 * @create 4/11/24 17:52
 */

public class NettyClient {
    private final String url;
    private final int port;
    private Channel channel;

    public NettyClient(String url, int port) {
        this.url = url;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new WebSocketClientInitializer());
            ChannelFuture future = bootstrap.connect(url, port).sync();
            this.channel = future.channel();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public void sendMessage(String message) {
        if (channel != null && channel.isActive()) {
            channel.writeAndFlush(new TextWebSocketFrame(message));
        }
    }

    public static void main(String[] args) throws Exception {
        NettyClient client = new NettyClient("api.openai.com", 443);
        client.start();
        // 发送测试消息
        client.sendMessage("Hello WebSocket!");
    }
}

package com.ptc.ai.box.biz.relay.client.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.URI;

/**
 * Created by IntelliJ IDEA.
 *
 * @Author : 
 * @create 4/11/24 18:01
 */

@Slf4j
@Component
public class WebSocketClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        log.info("target initChannel......");
        String url = "api.openai.com";

        // 10秒未读发送心跳,5秒未写关闭连接
        //ch.pipeline().addLast(new IdleStateHandler(180, 60, 0, TimeUnit.SECONDS));
        ch.pipeline().addLast(new HttpClientCodec());
        ch.pipeline().addLast(new HttpObjectAggregator(6555360));
        ch.pipeline().addLast(new ChunkedWriteHandler());

        DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        httpHeaders.add("Authorization", "");
        httpHeaders.add("OpenAI-Beta", "realtime=v1");
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                URI.create("wss://" + url +"/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"),
                WebSocketVersion.V13,
                null,
                false,
                httpHeaders, Integer.MAX_VALUE);
        ch.pipeline().addLast(new WebSocketClientProtocolHandler(handshaker));
        ch.pipeline().addLast(new TargetServerHandler());
    }
}
package com.ptc.ai.box.biz.relay.client.handler;

import com.ptc.ai.box.biz.relay.server.handler.DataChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * Created by IntelliJ IDEA.
 *
 * @Author :
 * @create 4/11/24 18:02
 */
@Slf4j
@Component
public class TargetServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("target channel read complete");
        ctx.flush();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("target channel inactive: channelId={}", ctx.channel().id());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("target channel exception:", cause);
        //ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("target channelActive......");
        InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String ip = inetSocket.getAddress().getHostAddress();
        log.info(ip);
        /*JSONObject jsonObject = new JSONObject();
        jsonObject.put("msg", "register");
        jsonObject.put("data", System.currentTimeMillis()+"");
        TextWebSocketFrame textWebSocketFrame =  new TextWebSocketFrame(jsonObject.toJSONString());
        ctx.writeAndFlush(textWebSocketFrame);*/
        /**
         * register router at userEventTriggered(HandshakeComplete)
         */
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        log.info("Received from target server: " + textWebSocketFrame.text());
        ctx.channel().writeAndFlush(textWebSocketFrame);
    }
}


http://www.kler.cn/a/393451.html

相关文章:

  • 量化交易系统开发-实时行情自动化交易-3.4.1.2.A股交易数据
  • MySQL系列之如何在Linux只安装客户端
  • Nuxt 版本 2 和 版本 3 的区别
  • 数据库SQL——连接表达式(JOIN)图解
  • 【C#设计模式(8)——过滤器模式(Adapter Pattern)】
  • Linux kernel 堆溢出利用方法(二)
  • 【Springboot】黑马大事件笔记 day1
  • 【go从零单排】HTTP客户端和服务端
  • 群控系统服务端开发模式-应用开发-前端退出功能
  • 丹摩征文活动|FLUX.1 和 ComfyUI:从部署到上手,轻松驾驭!
  • apk反编译修改教程系列-----apk应用反编译中AndroidManifest.xml详细代码释义解析 包含各种权限 代码含义
  • CyclicBarrier复杂场景示例
  • ThinkServer SR658H V2服务器BMC做raid与装系统
  • TCP 为什么是流协议而不是包协议
  • SpringBoot框架在共享汽车管理中的应用
  • 使用elementUI实现表格行拖拽改变顺序,无需引入外部库
  • 基于SpringBoot智慧社区管理平台
  • 力扣(LeetCode)LCR 179. 查找总价格为目标值的两个商品(Java)
  • Fabric.js中文教程
  • 唐帕科技校园语音报警系统:通过关键词识别,阻止校园霸凌事件
  • 网络基础 - 网段划分篇
  • 嵌入式硬件杂谈(一)-推挽 开漏 高阻态 上拉电阻
  • CHI 协议层 Retry —— CHI(8)
  • 安科瑞工业绝缘监测装置:保障煤矿井下6kV供电系统安全运行的关键应用——安科瑞 丁佳雯
  • Java使用Thumbnails进行图片处理
  • 《C++跨平台编译:打破系统边界,释放代码潜能》