一、简单版本
package com.ptc.ai.box.biz.relay.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
/**
* https://blog.csdn.net/a1053765496/article/details/130701218
* 基于Netty快速实现WebSocket客户端,不手动处理握手
*/
@Slf4j
public class SimpleWsClient {
final CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
SimpleWsClient client = new SimpleWsClient();
client.test();
}
public void test() throws Exception {
Channel dest = dest();
latch.await();
dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));
}
public Channel dest() throws Exception {
final URI webSocketURL = new URI("wss://api.openai.com:443/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01");
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.add("Authorization", "");
httpHeaders.add("OpenAI-Beta", "realtime=v1");
/*httpHeaders.add("Sec-WebSocket-Version", "13");
httpHeaders.add("Upgrade", "websocket");
httpHeaders.add("Connection", "Upgrade");*/
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory
.newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, httpHeaders)));
pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
System.err.println(" 客户端收到消息======== " + msg.text());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE
.equals(evt)) {
log.info(ctx.channel().id().asShortText() + " 握手完成!");
latch.countDown();
send(ctx.channel());
}
super.userEventTriggered(ctx, evt);
}
});
}
});
ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();
return cf.channel();
}
public static void send(Channel channel) {
final String textMsg = "握手完成后直接发送的消息";
if (channel != null && channel.isActive()) {
TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);
channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isDone() && channelFuture.isSuccess()) {
log.info(" ================= 发送成功.");
} else {
channelFuture.channel().close();
log.info(" ================= 发送失败. cause = " + channelFuture.cause());
channelFuture.cause().printStackTrace();
}
});
} else {
log.error("消息发送失败! textMsg = " + textMsg);
}
}
}
二、结构化版本
package com.ptc.ai.box.biz.relay.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
/**
* https://gitcode.com/ddean2009/learn-netty4/tree/master/src/main/java/com/flydean25/socketclient
* https://www.flydean.com/25-netty-websocket-client
* https://blog.csdn.net/superfjj/article/details/120648434
* https://blog.csdn.net/twypx/article/details/84543518
*/
public final class NettyWsClient {
static final String URL = System.getProperty("url", "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01");
public static void main(String[] args) throws Exception {
URI uri = new URI(URL);
final int port = 443;
EventLoopGroup group = new NioEventLoopGroup();
try {
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.add("Authorization", "");
httpHeaders.add("OpenAI-Beta", "realtime=v1");
WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
.newHandshaker(uri, WebSocketVersion.V13, null, true, httpHeaders));
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
}
});
Channel ch = b.connect(uri.getHost(), port).sync().channel();
handler.handshakeFuture().sync();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("再见".equalsIgnoreCase(msg)) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equalsIgnoreCase(msg)) {
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
}
package com.ptc.ai.box.biz.relay.client;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("channelActive, 进行handshake");
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("channelInactive!");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
log.info("websocket Handshake 完成!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
log.info("websocket连接失败!");
handshakeFuture.setFailure(e);
}
return;
}
log.info("channelRead0.......");
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
log.info("接收到TXT消息: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
log.info("接收到pong消息");
} else if (frame instanceof CloseWebSocketFrame) {
log.info("接收到closing消息");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 异常处理
log.error("出现异常", cause);
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
三、ChannelInitializer解耦合版本
package com.ptc.ai.box.biz.relay.client;
import com.ptc.ai.box.biz.relay.client.handler.TargetServerHandler;
import com.ptc.ai.box.biz.relay.client.handler.WebSocketClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import java.net.URI;
/**
* Created by IntelliJ IDEA.
*
* @Author :
* @create 4/11/24 17:52
*/
public class NettyClient {
private final String url;
private final int port;
private Channel channel;
public NettyClient(String url, int port) {
this.url = url;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new WebSocketClientInitializer());
ChannelFuture future = bootstrap.connect(url, port).sync();
this.channel = future.channel();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public void sendMessage(String message) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
}
public static void main(String[] args) throws Exception {
NettyClient client = new NettyClient("api.openai.com", 443);
client.start();
// 发送测试消息
client.sendMessage("Hello WebSocket!");
}
}
package com.ptc.ai.box.biz.relay.client.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.URI;
/**
* Created by IntelliJ IDEA.
*
* @Author :
* @create 4/11/24 18:01
*/
@Slf4j
@Component
public class WebSocketClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
log.info("target initChannel......");
String url = "api.openai.com";
// 10秒未读发送心跳,5秒未写关闭连接
//ch.pipeline().addLast(new IdleStateHandler(180, 60, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(6555360));
ch.pipeline().addLast(new ChunkedWriteHandler());
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.add("Authorization", "");
httpHeaders.add("OpenAI-Beta", "realtime=v1");
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
URI.create("wss://" + url +"/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"),
WebSocketVersion.V13,
null,
false,
httpHeaders, Integer.MAX_VALUE);
ch.pipeline().addLast(new WebSocketClientProtocolHandler(handshaker));
ch.pipeline().addLast(new TargetServerHandler());
}
}
package com.ptc.ai.box.biz.relay.client.handler;
import com.ptc.ai.box.biz.relay.server.handler.DataChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* Created by IntelliJ IDEA.
*
* @Author :
* @create 4/11/24 18:02
*/
@Slf4j
@Component
public class TargetServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("target channel read complete");
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("target channel inactive: channelId={}", ctx.channel().id());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("target channel exception:", cause);
//ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("target channelActive......");
InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = inetSocket.getAddress().getHostAddress();
log.info(ip);
/*JSONObject jsonObject = new JSONObject();
jsonObject.put("msg", "register");
jsonObject.put("data", System.currentTimeMillis()+"");
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(jsonObject.toJSONString());
ctx.writeAndFlush(textWebSocketFrame);*/
/**
* register router at userEventTriggered(HandshakeComplete)
*/
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
log.info("Received from target server: " + textWebSocketFrame.text());
ctx.channel().writeAndFlush(textWebSocketFrame);
}
}