当前位置: 首页 > article >正文

Spring + WebSocket

1. 简介

        WebSocket 是一种网络通信协议,提供了在单个TCP连接上进行全双工通信的能力。它允许服务器主动向客户端发送消息,而不需要客户端不断轮询服务器来检查更新。WebSocket 协议在2011年成为国际标准,并且被广泛用于实现实时通信功能,比如在线游戏、设备数据更新、实时聊天应用和股票行情更新等。

WebSocket 的主要特点包括:

  1. 全双工通信:客户端和服务器可以同时发送和接收数据,无需等待对方的响应。

  2. 持久连接:一旦WebSocket连接建立,它会保持开放状态,直到客户端或服务器决定关闭连接。

  3. 低延迟:由于不需要像HTTP那样每次通信都建立和关闭连接,WebSocket可以减少通信延迟。

  4. 头部开销小:WebSocket协议的头部比HTTP要小,这有助于减少数据传输的开销。

WebSocket 通信的建立通常遵循以下步骤:

  1. 握手:客户端通过发送一个特殊的HTTP请求来发起WebSocket连接,这个请求包含了Upgrade头部,表明客户端希望升级到WebSocket协议。

  2. 服务器响应:如果服务器支持WebSocket,它会响应一个HTTP响应,确认协议升级。

  3. 连接建立:一旦握手完成,客户端和服务器之间的连接就转变为WebSocket连接,双方可以开始发送数据。

WebSocket 使用ws(非加密)或wss(加密)作为URL协议前缀,例如:

  • ws://localhost/api/ws表示非加密的WebSocket连接。

  • wss://localhost/api/wss 表示加密的WebSocket连接。

2. Spring + React 实现ws通信

后端:SpringBoot

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置WebSocket:创建一个配置类来启用WebSocket并定义消息代理:

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

这段代码是Spring Framework中用于配置WebSocket消息代理的Java配置类。它使用了Spring的WebSocketMessageBrokerConfigurer接口来定义WebSocket通信的端点和消息代理的行为。以下是对这段代码的详细解释:

  1. @Configuration 注解: 这个注解表明这是一个Spring配置类,Spring容器将会为这个类创建一个bean,并将其加入到应用上下文中。

  2. @EnableWebSocketMessageBroker 注解: 这个注解启用了WebSocket消息代理,允许使用高级消息传递功能,例如消息的订阅和发布。它还启用了STOMP协议的支持。

  3. implements WebSocketMessageBrokerConfigurer: 这个类实现了WebSocketMessageBrokerConfigurer接口,该接口允许配置WebSocket消息代理的细节。

  4. registerStompEndpoints(StompEndpointRegistry registry) 方法: 这个方法用于注册STOMP协议的端点。在这个配置中,它添加了一个端点/ws,并且指定使用SockJS协议。SockJS是一个浏览器JavaScript库,它提供了一个透明的、跨域的WebSocket兼容接口。

    1. registry.addEndpoint("/ws"):注册一个新的WebSocket端点/ws

    2. .withSockJS():指定这个端点使用SockJS协议。

  5. configureMessageBroker(MessageBrokerRegistry registry) 方法: 这个方法用于配置消息代理的行为。

    1. registry.enableSimpleBroker("/topic"):启用一个简单的消息代理,它将消息路由到以/topic为前缀的目的地。这意味着所有发送到/topic前缀的目的地的消息都将被广播给所有订阅了这些目的地的客户端。

    2. registry.setApplicationDestinationPrefixes("/app"):设置应用程序目的地前缀为/app。这意味着所有以/app为前缀的目的地都将被用作应用程序的端点,例如/app/sendMessage

通过这个配置,Spring Boot应用将能够处理WebSocket连接,并且可以使用STOMP协议发送和接收消息。客户端可以通过/ws端点连接到服务器,并且可以订阅以/topic为前缀的目的地来接收消息,或者发送消息到以/app为前缀的目的地。这个配置为构建基于WebSocket的实时通信应用提供了基础。

触发器:controller

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class WebSocketController {

    @MessageMapping("/sendMessage")
    @SendTo("/topic/messages")
    public String processMessageFromClient(String message) {
        return "Server response: " + message;
    }
}

这段代码是一个Spring框架中的WebSocket控制器,使用了STOMP协议来处理WebSocket消息。下面是对这段代码的详细解释:

  1. @Controller 注解: 这个注解表明该类是一个WebSocket控制器,Spring将使用它来处理WebSocket相关的消息。

  2. @MessageMapping("/sendMessage") 注解: 这个注解定义了一个消息映射,它告诉Spring当接收到发送到/app/sendMessage(因为还有一个setApplicationDestinationPrefixes("/app")的配置)的WebSocket消息时,应该调用processMessageFromClient方法来处理这个消息。

  3. @SendTo("/topic/messages") 注解: 这个注解指定了处理方法返回的消息应该被发送到哪个目的地。在这个例子中,processMessageFromClient方法的返回值将被发送到/topic/messages,这意味着所有订阅了/topic/messages的客户端都将收到这个消息。

  4. processMessageFromClient 方法: 这个方法是实际处理客户端消息的方法。它接收一个字符串参数message,这个字符串是从客户端接收到的消息内容。方法返回一个字符串,这个字符串是服务器对客户端消息的响应。

    1. 方法参数:String message 是从客户端接收到的消息。

    2. 方法返回值:String 是服务器对客户端消息的响应,这里简单地在客户端消息前加上了前缀"Server response: "

当客户端通过WebSocket连接并发送消息到/app/sendMessage时,Spring将自动调用processMessageFromClient方法,并将客户端发送的消息作为参数传递给这个方法。方法处理完消息后,返回的响应将被发送到所有订阅了/topic/messages的客户端。

触发器:端点

使用@ServerEndpoint注解定义WebSocket端点:

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint("/ws")
public class WebSocketEndpoint {

    private static final Set<WebSocketEndpoint> endpoints = new CopyOnWriteArraySet<>();

    public WebSocketEndpoint() {
        endpoints.add(this);
    }

    @OnOpen
    public void onOpen(Session session) {
        System.out.println("New connection opened: " + session.getId());
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("Received message from " + session.getId() + ": " + message);
        for (WebSocketEndpoint endpoint : endpoints) {
            try {
                endpoint.sendMessage(session.getId(), message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnClose
    public void onClose(Session session) {
        System.out.println("Connection closed: " + session.getId());
        endpoints.remove(this);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        System.out.println("Error in connection: " + session.getId());
        throwable.printStackTrace();
        endpoints.remove(this);
    }

    public void sendMessage(String sessionId, String message) throws IOException {
        Session session = sessions.get(sessionId);
        if (session != null) {
            session.getBasicRemote().sendText(message);
        }
    }
}

使用@ServerEndpoint注解来定义WebSocket端点,而不是通过控制器层的处理器。这样,当客户端连接到/ws端点时,会自动触发WebSocketEndpoint实例的onOpen方法,从而建立WebSocket连接。

  • @ServerEndpoint("/ws"): 这是一个注解,用于定义WebSocket的端点。在这个例子中,客户端可以通过访问/ws路径来建立WebSocket连接。

  • private static final Set<WebSocketEndpoint> endpoints: 这是一个静态的Set集合,用于存储所有活动的WebSocketEndpoint实例。它使用CopyOnWriteArraySet,这是一种线程安全的变体,适用于读多写少的场景。

  • public WebSocketEndpoint(): 这是WebSocketEndpoint类的构造函数。在创建新的实例时,它会将自己添加到endpoints集合中。

  • @OnOpen public void onOpen(Session session): 这个方法在新的WebSocket会话打开时被调用。Session对象代表与客户端的连接,可以用于发送消息和关闭连接。在这个方法中,通常可以执行一些初始化操作,比如打印日志或将用户信息与会话关联。

  • @OnMessage public void onMessage(String message, Session session): 这个方法在服务器接收到客户端发送的消息时被调用。它接收消息内容和与之关联的Session对象。在这个例子中,服务器接收到消息后,会将消息打印到控制台,并将相同的消息广播给所有连接的客户端。

  • @OnClose public void onClose(Session session): 这个方法在WebSocket会话关闭时被调用。在这里,可以执行一些清理操作,比如从endpoints集合中移除当前的WebSocketEndpoint实例。

  • @OnError public void onError(Session session, Throwable throwable): 这个方法在WebSocket会话发生错误时被调用。它接收与错误关联的Session对象和一个Throwable对象,后者包含了错误的详细信息。在这里,可以处理错误,比如打印堆栈跟踪或从endpoints集合中移除当前的WebSocketEndpoint实例。

  • public void sendMessage(String sessionId, String message) throws IOException: 这个方法用于向特定的会话发送消息。它接收会话ID和要发送的消息内容。方法内部,它查找与给定会话ID关联的Session对象,并使用BasicRemote对象发送消息。如果会话不存在或发生IO异常,可能会抛出IOException

前端:React

安装SockJS和STOMP客户端:使用npm安装SockJS和STOMP客户端库:

npm install sockjs-client stompjs

创建WebSocket逻辑:在React组件中,使用SockJS和STOMP客户端连接到后端WebSocket服务器,并发送/接收消息。

import React, { useEffect, useState } from 'react';
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';

function App() {
    const [messages, setMessages] = useState([]);
    const [inputValue, setInputValue] = useState('');

    useEffect(() => {
        const socket = new SockJS('http://localhost:8080/ws');
        const stompClient = Stomp.over(socket);

        stompClient.connect({}, frame => {
            stompClient.subscribe('/topic/messages', message => {
                const messageBody = JSON.parse(message.body);
                setMessages(prevMessages => [...prevMessages, messageBody]);
            });
        });

        const sendMessage = () => {
            stompClient.send("/app/sendMessage", {}, inputValue);
            setInputValue(''); // Clear input field after sending
        };

        return () => {
            stompClient.disconnect();
        };
    }, [inputValue]);

    return (
        <div>
            <h1>WebSocket Messages</h1>
            <ul>
                {messages.map((message, index) => (
                    <li key={index}>{message}</li>
                ))}
            </ul>
            <input
                value={inputValue}
                onChange={e => setInputValue(e.target.value)}
                placeholder="Type a message"
            />
            <button onClick={sendMessage}>Send</button>
        </div>
    );
}

export default App;
  • 这个钩子用于处理组件挂载和卸载时的逻辑。

  • 当组件挂载时(useEffect无依赖或依赖项变化时),它创建一个SockJS连接和一个Stomp客户端。

  • stompClient.connect用于建立与服务器的STOMP连接。连接成功后,它订阅/topic/messages主题,以便接收服务器发送的消息。

  • setMessages用于更新状态,将新接收的消息添加到messages数组中。

  • sendMessage函数用于发送消息到服务器。它将inputValue发送到/app/sendMessage目的地,并在发送后清空输入框。

3. 实际业务

        一批设备通过mqtt传输消息,前端页面需要显示设备实时信息,如天气、风力、速度等。业务逻辑可以为,后端代码接收到设备通过mqtt传输的消息,将其一定量转化,然后通过ws协议将数据推送,最后前端拿到数据显示。

1. 定义端点

@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageConfiguration implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private AuthPrincipalHandler authPrincipalHandler;

    @Autowired
    private WebSocketDefaultFactory webSocketDefaultFactory;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // Set the WebSocket connection address
        registry.addEndpoint("/api/v1/ws").setAllowedOriginPatterns("*")
                .setHandshakeHandler(authPrincipalHandler);
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.addDecoratorFactory(webSocketDefaultFactory);
        registry.setTimeToFirstMessage(60000 * 60 * 24 * 10);
    }

}
  1. implements WebSocketMessageBrokerConfigurer:这个类实现了WebSocketMessageBrokerConfigurer接口,该接口允许配置WebSocket消息代理的细节。

  2. AuthPrincipalHandler authPrincipalHandler:这是一个自定义的握手处理器,用于在WebSocket连接建立之前进行一些认证操作。

  3. WebSocketDefaultFactory webSocketDefaultFactory:这是一个自定义的装饰器工厂,用于添加一些默认的配置或处理逻辑。

  4. registerStompEndpoints(StompEndpointRegistry registry):这个方法用于注册STOMP协议的端点。在这个例子中,端点被设置为/api/v1/ws,并且允许来自任何源的连接(setAllowedOriginPatterns("*"))。同时,设置了自定义的握手处理器authPrincipalHandler

  5. configureWebSocketTransport(WebSocketTransportRegistration registry):这个方法用于配置WebSocket传输层的一些设置。在这个例子中,它添加了一个自定义的装饰器工厂webSocketDefaultFactory,并且设置了首次消息时间(timeToFirstMessage)为10天。

2. 消息服务

@Service
@Slf4j
public class SendMessageServiceImpl implements ISendMessageService {

    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private IWebSocketManageService webSocketManageService;

    @Override
    public void sendMessage(ConcurrentWebSocketSession session, CustomWebSocketMessage message) {
        if (session == null) {
            return;
        }

        try {
            if (!session.isOpen()) {
                session.close();
                log.debug("This session is closed.");
                return;
            }


            session.sendMessage(new TextMessage(mapper.writeValueAsBytes(message)));
        } catch (IOException e) {
            log.info("Failed to publish the message. {}", message.toString());
            e.printStackTrace();
        }
    }

    @Override
    public void sendBatch(Collection<ConcurrentWebSocketSession> sessions, CustomWebSocketMessage message) {
        if (sessions.isEmpty()) {
            return;
        }

        try {

            TextMessage data = new TextMessage(mapper.writeValueAsBytes(message));

            for (ConcurrentWebSocketSession session : sessions) {
                if (!session.isOpen()) {
                    session.close();
                    log.debug("This session is closed.");
                    return;
                }
                session.sendMessage(data);
            }

        } catch (IOException e) {
            log.info("Failed to publish the message. {}", message.toString());

            e.printStackTrace();
        }
    }

    @Override
    public void sendBatch(String workspaceId, Integer userType, String bizCode, Object data) {
        if (!StringUtils.hasText(workspaceId)) {
            throw new RuntimeException("Workspace ID does not exist.");
        }
        Collection<ConcurrentWebSocketSession> sessions = Objects.isNull(userType) ?
                webSocketManageService.getValueWithWorkspace(workspaceId) :
                webSocketManageService.getValueWithWorkspaceAndUserType(workspaceId, userType);

        this.sendBatch(sessions, CustomWebSocketMessage.builder()
                        .data(data)
                        .timestamp(System.currentTimeMillis())
                        .bizCode(bizCode)
                        .build());
    }

    @Override
    public void sendBatch(String workspaceId, String bizCode, Object data) {
        this.sendBatch(workspaceId, null, bizCode, data);
    }
}

底层消息发送是通过ConcurrentWebSocketSessionDecorator类中的sendMessage方法

public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator
{
public void sendMessage(WebSocketMessage<?> message) throws IOException {
    if (!this.shouldNotSend()) {
        this.buffer.add(message);
        this.bufferSize.addAndGet(message.getPayloadLength());
        if (this.preSendCallback != null) {
            this.preSendCallback.accept(message);
        }

        do {
            if (!this.tryFlushMessageBuffer()) {
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("Another send already in progress: session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes", this.getId(), this.getTimeSinceSendStarted(), this.getBufferSize()));
                }

                this.checkSessionLimits();
                break;
            }
        } while(!this.buffer.isEmpty() && !this.shouldNotSend());

    }
}
}

3. 使用

@Autowired
private SendMessageServiceImpl sendMessageService;

@Override
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_HMS)
public void handleHms(CommonTopicReceiver receiver, MessageHeaders headers) {
    List<DeviceHmsDTO> unReadList = new ArrayList<>();
    objectMapper.convertValue(((Map) (receiver.getData())).get(MapKeyConst.LIST),
            new TypeReference<List<DeviceHmsReceiver>>() {
            })
            .forEach(hmsReceiver -> {
                final DeviceHmsEntity hms = entity.clone();
                this.fillEntity(hms, hmsReceiver);
                // The same unread hms are no longer incremented.
                if (hmsMap.contains(hms.getHmsKey())) {
                    return;
                }
                this.fillMessage(hms, hmsReceiver.getArgs());
                DockDevice dagDevice = this.dagBridge.getDockDevice(sn);
                if (dagDevice != null&&hms.getLevel()==2) {
                    dagDevice.drone.AlertInfoReport(hms.getMessageZh());
                }
                unReadList.add(entity2Dto(hms));
                mapper.insert(hms);
            });

    sendMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
            BizCodeEnum.DEVICE_HMS.getCode(),
            TelemetryDTO.<List<DeviceHmsDTO>>builder().sn(sn).host(unReadList).build());
}

        在这个消息通道中有消息,接收消息转换并将其发送到http://localhost:1234/api/ws。前端代码订阅该主题即可获取响应消息。

注:有没有发现这个很像发布/订阅模型?


http://www.kler.cn/news/363363.html

相关文章:

  • 简单的 curl HTTP的POSTGET请求以及ip port连通性测试
  • pytorch dataloader学习
  • docker 和 containerd 关系
  • 【SPIE独立出版 | 往届会后3个半月检索】第四届电子信息工程与数据处理国际学术会议(EIEDP 2025)
  • 数据治理(1)-数据规划
  • TiDB替换Starrocks:业务综合宽表迁移的性能评估与降本增效决策
  • SparkSQL整合Hive
  • Java集合(2 :List)
  • 【数据库】postgres数据库命令
  • 【树莓派 5B】Python 版本切换
  • C++之《剑指offer》学习记录(6):unordered_set和unordered_map
  • Proteus8使用教程
  • 如何使用pycharm测试自己的后端接口
  • 使用.NET MAUI开发第一个安卓APP
  • Fine-tuning 和 LoRA 和 QLoRA的区别
  • 常用于OBD系统的单端K总线收发器芯片资料:CSM9241
  • 【学习笔记】RFID
  • Facebook网页版登录不了是什么原因?如何解决?
  • Jtti:服务器GPU占用率过高是好事还是坏事?
  • 数字三角形模型
  • Vue前端开发:单向数据绑定
  • 中信银行深化ESG理念 以金融高质量发展助力金融强国建设
  • asp.net core mvc发布时输出视图文件Views
  • CSP-J复赛集训200-300分(5):[CSP-J 2021] 插入排序
  • 【计算机网络】HTTP报文详解,HTTPS基于HTTP做了哪些改进?(面试经典题)
  • vue3学习记录-自定义指令