如何在 WebSocketHandler 中控制连接的断开
如何在 WebSocketHandler 中控制连接的断开
1. WebSocket 连接的生命周期
在 Spring WebFlux 中,WebSocket 连接的生命周期由 Mono<Void>
来控制。当 WebSocket 连接通过 ReactorNettyWebSocketClient
被发起时,Mono<Void>
表示 WebSocket 连接的生命周期,只有在连接被关闭时,Mono<Void>
才会完成。
client.execute()
返回的 Mono<Void>
与 WebSocketHandler
返回的 Mono<Void>
是相同的,它们都是用于表示 WebSocket 连接的生命周期,表示 WebSocket 连接的打开与关闭。这两个 Mono
都是完成时,连接会关闭。
例如,client.execute()
返回的 Mono<Void>
会在连接关闭时完成:
public Mono<Void> connectToWebSocketServer() {
return client.execute(URI.create("ws://example.com/socket"), session -> {
// 发送消息
Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));
// 处理接收到的消息
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println) // 打印收到的消息
.subscribe();
return send;
});
}
2. client.execute()
和 WebSocketHandler
返回的 Mono
client.execute()
和 WebSocketHandler
返回的 Mono<Void>
是紧密关联的。client.execute()
方法会调用 WebSocketHandler
来处理与服务器的 WebSocket 连接,在执行 WebSocket 连接的过程中,Mono<Void>
会表示这个连接的生命周期。
-
Mono<Void>
表示连接的生命周期:Mono<Void>
在 WebSocket 连接成功建立时开始,直到连接关闭时完成。在 WebSocketHandler 中,我们通过session.send()
来发送消息,通过session.receive()
来接收消息。在这个过程中,Mono<Void>
会一直保持开启,直到连接关闭。 -
WebSocketHandler 的作用:
WebSocketHandler
是用来处理 WebSocket 消息的核心组件,它是通过client.execute()
与 WebSocket 服务器建立连接的。当连接成功建立时,Mono<Void>
开始执行,直到连接被主动关闭或由于某些异常关闭。
例如,在 WebSocketHandler
中,你可能会返回一个 Mono<Void>
来表示消息的发送和接收,最终决定什么时候关闭连接:
public Mono<Void> handle(WebSocketSession session) {
// 发送消息到服务器
Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));
// 处理接收到的消息
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println) // 打印收到的消息
.subscribe();
// 返回 Mono<Void>,连接将持续,直到 Mono 完成
return send.then(session.close()); // 发送完消息后关闭连接
}
3. WebSocket 何时会关闭?
WebSocket 连接的关闭由以下几种情况引起:
-
Mono 结束: 如果
Mono<Void>
结束,WebSocket 连接会关闭。例如,当session.send()
里的Mono<Void>
完成后,WebSocket 连接就会关闭。Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!"))); return send; // 发送完 Hello! 后,Mono<Void> 会结束,WebSocket 关闭
-
服务器或客户端主动关闭: 服务器或客户端可以通过
session.close()
主动关闭 WebSocket 连接。return session.send(Flux.just(session.textMessage("Hello"))).then(session.close());
-
Flux 终止: 如果
session.send()
使用的是有限的Flux
(例如Flux.just()
),当消息发送完毕后,WebSocket 连接也会关闭。Flux<String> messages = Flux.just("Msg1", "Msg2", "Msg3"); // 发送3条消息 return session.send(messages.map(session::textMessage)); // 发送完毕后,WebSocket 会关闭
-
无限流不会自动关闭: 如果
Flux.interval()
被用于消息流,它不会自动终止,所以 WebSocket 连接不会自动关闭。为了让连接在一定时间后断开,可以使用.take(n)
限制消息数量。Flux<String> messages = Flux.interval(Duration.ofSeconds(1)) .map(i -> "Msg " + i) .take(10); // 只发送10条消息 return session.send(messages.map(session::textMessage)) .then(session.close()); // 发送完10条后关闭连接
4. 如何主动控制 WebSocket 连接的断开?
如果你希望 WebSocket 在特定条件下主动断开,可以通过以下方式控制:
-
使用
Mono<Void>
来控制连接关闭: 当Mono<Void>
完成时,连接会自动关闭。Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!"))); return send.then(session.close()); // 发送完消息后手动关闭连接
-
限制消息流: 你可以使用
take(n)
来限制发送的消息数量,达到一定数量后 WebSocket 连接会自动关闭。Flux<String> output = Flux.interval(Duration.ofSeconds(1)) .map(time -> "Server time: " + time) .take(10); // 只发送10条消息 return session.send(output.map(session::textMessage)) .then(session.close()); // 发送完毕后手动关闭连接
-
监听连接关闭: 可以通过
session.isOpen()
监听 WebSocket 是否已关闭,或者使用doOnTerminate()
监听关闭事件。session.receive() .doOnTerminate(() -> System.out.println("WebSocket connection closed")) .subscribe();
-
异常处理与重连: 使用
retryWhen()
或Flux.never()
来确保连接在断开后能够重连,或保持连接一直处于开放状态,直到明确关闭。client.execute(URI.create("ws://example.com/socket"), session -> { return session.send(Flux.interval(Duration.ofSeconds(1)) .map(i -> session.textMessage("Ping " + i))) .then(); }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5))) // 断开后自动重连 .subscribe();
return session.receive().then(Flux.never()); // 永不结束,保持 WebSocket 连接
5. 总结
client.execute()
返回的 Mono<Void>
和 WebSocketHandler
返回的 Mono<Void>
是同一个概念,它们表示 WebSocket 连接的生命周期。只有在连接关闭时,这些 Mono
才会完成。
通过 Mono<Void>
和 WebSocketHandler
,你可以灵活地控制 WebSocket 连接的打开、消息的发送与接收,以及连接的关闭。