Web 开发 —— 高阶 WebSocket 和 SSE
说明
随着大语言模型的流行,因为使用了 SSE 或者 Websocket的技术进行流式的交换,使得 SSE 和 Websocket也火了起来,今天我们来讲解,如何在 Solon 中实现 Websocket 服务端和 SSE 服务端。
Websocket
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
我们这里通过一个简易的 im 的服务做为例子。
依赖
如果使用了solon-web,默认使用的是smart-http,已经集成了 websocket,不需要添加其他的依赖。
插件 | 适配框架 | 包大小 | 信号协议支持 | 端口 |
---|---|---|---|---|
solon-boot-smarthttp | smart-http (aio) | 0.4Mb | http, ws | 相同端口 |
solon-boot-jetty + solon-boot-jetty-add-websocket | jetty (nio) | 1.9Mb | http, ws | 相同端口 |
solon-boot-undertow | undertow (nio) | 4.3Mb | http, ws | 相同端口 |
solon-boot-websocket | websocket (nio) | 0.4Mb | ws | 独立端口 |
solon-boot-websocket-netty | netty (nio) | ws | 独立端口 |
独立的 WebSocket 插件,会使用独立的端口,且默认为:主端口 + 10000,但都可以通过配置进行修改。
启用
在主类开启 Websocket。
package com.example.demo.web;
import org.noear.solon.Solon;
import org.noear.solon.annotation.SolonMain;
/**
* @author airhead
*/
@SolonMain
public class DemoWeb05App {
public static void main(String[] args) {
Solon.start(
DemoWeb05App.class,
args,
app -> {
app.enableWebSocket(true);
});
}
}
开放端点
这里集成了 SimpleWebSocketListener,已经提供了基本的实现,可以根据实际的需要实现需要的接口。
package com.example.demo.web.im.endpoint;
import com.example.demo.web.im.service.ImService;
import java.io.IOException;
import org.noear.solon.annotation.Inject;
import org.noear.solon.net.annotation.ServerEndpoint;
import org.noear.solon.net.websocket.WebSocket;
import org.noear.solon.net.websocket.listener.SimpleWebSocketListener;
/**
* @author airhead
*/
@ServerEndpoint("/im.ws")
public class ImWebsocket extends SimpleWebSocketListener {
@Inject private ImService service;
@Override
public void onOpen(WebSocket socket) {
service.onOpen(socket);
}
@Override
public void onMessage(WebSocket socket, String text) throws IOException {
service.onMessage(socket, text);
}
}
实现逻辑
此处只是做简单的鉴权,和消息的回复,如果要实现完整的 IM 服务,还需要做会话的管理的逻辑。
package com.example.demo.web.im.service;
import java.io.IOException;
import org.dromara.hutool.core.text.StrUtil;
import org.noear.solon.annotation.Component;
import org.noear.solon.net.websocket.WebSocket;
/**
* @author airhead
*/
@Component
public class ImService {
public void onOpen(WebSocket socket) {
String token = socket.param("token");
if (StrUtil.isBlank(token)) {
socket.close();
}
// 省略了管理 socket 的管理
}
public void onMessage(WebSocket socket, String text) throws IOException {
socket.send("> " + text + "\r\n" + "消息已阅");
}
}
验证
SSE
SSE 全称是 Server-Sent Event,网页自动获取来自服务器的更新,SSE 是单向消息传递。
我们这里通过一个简易的 LLM 的服务做为例子。
依赖
要实现 SSE 需要引入 solon-web-sse。
dependencies {
implementation platform(project(":demo-parent"))
implementation("org.noear:solon-web")
implementation("org.noear:solon-web-sse")
}
开放端点
客户端先要调用 open 方法,初始化 SseEmitter 连接,之后就可以通过 SseEmitter 给客户端发送消息了,为了方便测试这里还提供了 send 方法。
package com.example.demo.web.chat.controller;
import com.example.demo.web.chat.service.ChatService;
import org.noear.solon.annotation.Controller;
import org.noear.solon.annotation.Inject;
import org.noear.solon.annotation.Mapping;
import org.noear.solon.web.sse.SseEmitter;
/**
* @author airhead
*/
@Controller
@Mapping("/chat")
public class ChatController {
@Inject private ChatService service;
@Mapping(value = "/open/{id}")
public SseEmitter open(String id) {
return service.open(id);
}
@Mapping("/send/{id}")
public String send(String id) {
return service.send(id);
}
@Mapping("/close/{id}")
public String close(String id) {
return service.close(id);
}
}
实现逻辑
与 Websocket 不同的是,SSE 是单向连接,请求的过程是不带会话连接的,所以需要自己管理好会话。
package com.example.demo.web.chat.service;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.noear.snack.ONode;
import org.noear.solon.Utils;
import org.noear.solon.annotation.Component;
import org.noear.solon.core.handle.Result;
import org.noear.solon.web.sse.SseEmitter;
import org.noear.solon.web.sse.SseEvent;
/**
* @author airhead
*/
@Component
@Slf4j
public class ChatService {
static Map<String, SseEmitter> emitterMap = new HashMap<>();
public SseEmitter open(String id) {
SseEmitter sseEmitter =
new SseEmitter(60 * 1000L)
.onCompletion(() -> emitterMap.remove(id))
.onError(e -> log.error("初始化 sse 错误", e))
.onInited(s -> emitterMap.put(id, s));
try {
sseEmitter.send("Ok");
} catch (IOException e) {
log.error("发送 sse 错误", e);
throw new RuntimeException(e);
}
// 初始化后,才能使用
return sseEmitter;
}
public String send(String id) {
SseEmitter emitter = emitterMap.get(id);
if (emitter == null) {
return "No user: " + id;
}
String msg = "test msg -> " + System.currentTimeMillis();
System.out.println(msg);
try {
emitter.send(msg);
// reconnectTime 用于提示前端重连时间
emitter.send(new SseEvent().id(Utils.guid()).data(msg).reconnectTime(1000L));
emitter.send(ONode.stringify(Result.succeed(msg)));
} catch (IOException e) {
log.error("发送 sse 错误", e);
throw new RuntimeException(e);
}
return "Ok";
}
public String close(String id) {
SseEmitter emitter = emitterMap.get(id);
if (emitter != null) {
emitter.complete();
}
return "Ok";
}
}
验证
ApiFox 支持测试 SSE,连接成功之后就可以调用 send 方法。
也可以直接使用 curl 进行测试