当前位置: 首页 > article >正文

如何在 WebSocketHandler 中控制连接的断开

如何在 WebSocketHandler 中控制连接的断开

1. WebSocket 连接的生命周期

在 Spring WebFlux 中,WebSocket 连接的生命周期由 Mono<Void> 来控制。当 WebSocket 连接通过 ReactorNettyWebSocketClient 被发起时,Mono<Void> 表示 WebSocket 连接的生命周期,只有在连接被关闭时,Mono<Void> 才会完成。

client.execute() 返回的 Mono<Void>WebSocketHandler 返回的 Mono<Void> 是相同的,它们都是用于表示 WebSocket 连接的生命周期,表示 WebSocket 连接的打开与关闭。这两个 Mono 都是完成时,连接会关闭。

例如,client.execute() 返回的 Mono<Void> 会在连接关闭时完成:

public Mono<Void> connectToWebSocketServer() {
    return client.execute(URI.create("ws://example.com/socket"), session -> {
        // 发送消息
        Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));

        // 处理接收到的消息
        session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(System.out::println) // 打印收到的消息
            .subscribe();

        return send;
    });
}

2. client.execute()WebSocketHandler 返回的 Mono

client.execute()WebSocketHandler 返回的 Mono<Void> 是紧密关联的。client.execute() 方法会调用 WebSocketHandler 来处理与服务器的 WebSocket 连接,在执行 WebSocket 连接的过程中,Mono<Void> 会表示这个连接的生命周期。

  • Mono<Void> 表示连接的生命周期: Mono<Void> 在 WebSocket 连接成功建立时开始,直到连接关闭时完成。在 WebSocketHandler 中,我们通过 session.send() 来发送消息,通过 session.receive() 来接收消息。在这个过程中,Mono<Void> 会一直保持开启,直到连接关闭。

  • WebSocketHandler 的作用: WebSocketHandler 是用来处理 WebSocket 消息的核心组件,它是通过 client.execute() 与 WebSocket 服务器建立连接的。当连接成功建立时,Mono<Void> 开始执行,直到连接被主动关闭或由于某些异常关闭。

例如,在 WebSocketHandler 中,你可能会返回一个 Mono<Void> 来表示消息的发送和接收,最终决定什么时候关闭连接:

public Mono<Void> handle(WebSocketSession session) {
    // 发送消息到服务器
    Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));
    
    // 处理接收到的消息
    session.receive()
           .map(WebSocketMessage::getPayloadAsText)
           .doOnNext(System.out::println) // 打印收到的消息
           .subscribe();
    
    // 返回 Mono<Void>,连接将持续,直到 Mono 完成
    return send.then(session.close()); // 发送完消息后关闭连接
}

3. WebSocket 何时会关闭?

WebSocket 连接的关闭由以下几种情况引起:

  • Mono 结束: 如果 Mono<Void> 结束,WebSocket 连接会关闭。例如,当 session.send() 里的 Mono<Void> 完成后,WebSocket 连接就会关闭。

    Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));
    return send; // 发送完 Hello! 后,Mono<Void> 会结束,WebSocket 关闭
    
  • 服务器或客户端主动关闭: 服务器或客户端可以通过 session.close() 主动关闭 WebSocket 连接。

    return session.send(Flux.just(session.textMessage("Hello"))).then(session.close());
    
  • Flux 终止: 如果 session.send() 使用的是有限的 Flux(例如 Flux.just()),当消息发送完毕后,WebSocket 连接也会关闭。

    Flux<String> messages = Flux.just("Msg1", "Msg2", "Msg3"); // 发送3条消息
    return session.send(messages.map(session::textMessage)); // 发送完毕后,WebSocket 会关闭
    
  • 无限流不会自动关闭: 如果 Flux.interval() 被用于消息流,它不会自动终止,所以 WebSocket 连接不会自动关闭。为了让连接在一定时间后断开,可以使用 .take(n) 限制消息数量。

    Flux<String> messages = Flux.interval(Duration.ofSeconds(1))
                                 .map(i -> "Msg " + i)
                                 .take(10); // 只发送10条消息
    return session.send(messages.map(session::textMessage))
                  .then(session.close()); // 发送完10条后关闭连接
    

4. 如何主动控制 WebSocket 连接的断开?

如果你希望 WebSocket 在特定条件下主动断开,可以通过以下方式控制:

  • 使用 Mono<Void> 来控制连接关闭:Mono<Void> 完成时,连接会自动关闭。

    Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));
    return send.then(session.close()); // 发送完消息后手动关闭连接
    
  • 限制消息流: 你可以使用 take(n) 来限制发送的消息数量,达到一定数量后 WebSocket 连接会自动关闭。

    Flux<String> output = Flux.interval(Duration.ofSeconds(1))
                              .map(time -> "Server time: " + time)
                              .take(10);  // 只发送10条消息
    return session.send(output.map(session::textMessage))
                  .then(session.close()); // 发送完毕后手动关闭连接
    
  • 监听连接关闭: 可以通过 session.isOpen() 监听 WebSocket 是否已关闭,或者使用 doOnTerminate() 监听关闭事件。

    session.receive()
           .doOnTerminate(() -> System.out.println("WebSocket connection closed"))
           .subscribe();
    
  • 异常处理与重连: 使用 retryWhen()Flux.never() 来确保连接在断开后能够重连,或保持连接一直处于开放状态,直到明确关闭。

    client.execute(URI.create("ws://example.com/socket"), session -> {
        return session.send(Flux.interval(Duration.ofSeconds(1))
                                .map(i -> session.textMessage("Ping " + i)))
                      .then();
    }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5))) // 断开后自动重连
    .subscribe();
    
    return session.receive().then(Flux.never()); // 永不结束,保持 WebSocket 连接
    

5. 总结

client.execute() 返回的 Mono<Void>WebSocketHandler 返回的 Mono<Void> 是同一个概念,它们表示 WebSocket 连接的生命周期。只有在连接关闭时,这些 Mono 才会完成。

通过 Mono<Void>WebSocketHandler,你可以灵活地控制 WebSocket 连接的打开、消息的发送与接收,以及连接的关闭。


http://www.kler.cn/a/573130.html

相关文章:

  • 016.3月夏令营:数理类
  • js 之 lodash函数库 的下载与基础使用
  • 深圳区域、人口、地铁线
  • Amorphous Data如何在DigitalOcean上构建智能、经济的AI Agent
  • Matlab实现车牌识别
  • 【AI神经网络与人脑神经系统的关联及借鉴分析】
  • 数据结构(四)栈和队列
  • python代码注释方式
  • (数据结构)双向链表
  • 自己的网页加一个搜索框,调用deepseek的API
  • 2025-03-04 学习记录--C/C++-PTA 习题5-3 使用函数计算两点间的距离
  • JetBrains学生申请
  • 最新Spring Security实战教程(一)初识Spring Security安全框架
  • 【Python · Pytorch】Conda介绍 DGL-cuda安装
  • 如何配置虚拟机连接finalshell并克隆
  • AI---DevOps常备工具(‌AI-Integrated DevOps Essential Tools)
  • NO.24十六届蓝桥杯备战|二维数组八道练习|杨辉三角|矩阵(C++)
  • 力扣-字符串
  • 设计模式|策略模式 Strategy Pattern 详解
  • EIF加载---虚拟物理地址加载