当前位置: 首页 > article >正文

spring cloud 使用 webSocket

1.引入依赖,(在微服务模块中)

<!-- Spring WebSocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.新建文件

package com.ruoyi.foundation.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 */

@Configuration
public class WebSocketConfig {
    // 使用boot内置tomcat时需要注入此bean
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
package com.ruoyi.foundation.webSocket;

import lombok.extern.slf4j.Slf4j;

import javax.websocket.Session;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class WebsocketUtil {
    private static final Map<String, Session> ONLINE_SESSION = new ConcurrentHashMap<>();

    /**
     * 添加session
     */
    public static void addSession(String userId, Session session){
        // 一个用户只允许一个session链接
        ONLINE_SESSION.putIfAbsent(userId, session);
        log.info("User [{}] connected. Total online users: {}", userId, ONLINE_SESSION.size());
    }

    /**
     * 移除session
     */
    public static void removeSession(String userId){
        ONLINE_SESSION.remove(userId);
        log.info("User [{}] disconnected. Total online users: {}", userId, ONLINE_SESSION.size());
    }

    /**
     * 给单个用户推送消息
     */
    public static void sendMessage(String userId, String message){
        Session session = ONLINE_SESSION.get(userId);
        if(session == null){
            log.warn("Session for user [{}] not found", userId);
            return;
        }
        sendMessage(session, message);
    }

    public static void sendMessage(Session session, String message) {
        if (session != null) {
            session.getAsyncRemote().sendText(message);
        }
    }

    /**
     * 给所有用户发消息
     */
    public static void sendMessageForAll(String message) {
        ONLINE_SESSION.forEach((userId, session) -> {
            CompletableFuture.runAsync(() -> sendMessage(session, message))
                    .exceptionally(ex -> {
                        log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
                        return null;
                    });
        });
    }

    /**
     * 给指定的多个用户推送消息
     */
    public static void sendMessageForUsers(Set<String> userIds, String message) {
        userIds.forEach(userId -> {
            Session session = ONLINE_SESSION.get(userId);
            if (session == null) {
                log.warn("Session for user [{}] not found", userId);
                return;
            }
            CompletableFuture.runAsync(() -> sendMessage(session, message))
                    .exceptionally(ex -> {
                        log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
                        return null;
                    });
        });
    }
}
package com.ruoyi.foundation.apicontroller;

import com.google.gson.Gson;
import com.ruoyi.foundation.apicontroller.req.MemorialHallWebsocketReq;
import com.ruoyi.foundation.webSocket.WebsocketUtil;
import io.seata.common.util.StringUtils;
import org.apache.poi.util.StringUtil;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Component
@ServerEndpoint(value = "/api/memorialHallWebsocket/{dailyMissId}/{userId}")
public class MmMemorialHallWebsocketController {
    /**
     * 保存每日一念纪念馆中当前在线的用户ID
     */
    private static final Map<String, List<String>> memorialHallUsers = new ConcurrentHashMap<>();

    private Gson gson=new Gson();

    @OnOpen
    public void onOpen(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {
        WebsocketUtil.addSession(userId, session);

        List<String> strings = memorialHallUsers.get(dailyMissId);
        if (strings == null){
            List<String> list=new ArrayList<>();
            list.add(userId);
            memorialHallUsers.put(dailyMissId,list);
        }else{
            strings.add(userId);
        }
    }

    @OnClose
    public void onClose(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {
        WebsocketUtil.removeSession(userId);

        List<String> strings = memorialHallUsers.get(dailyMissId);
        if(strings != null){
            strings.remove(userId);
        }
    }

    @OnMessage
    public void onMessage(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session, String message) {
        /*System.out.println(dailyMissId);
        System.out.println(userId);
        System.out.println(session);
        System.out.println(message);*/

        //MemorialHallWebsocketReq memorialHallWebsocketReq = gson.fromJson(message, MemorialHallWebsocketReq.class);

        List<String> strings = memorialHallUsers.get(dailyMissId);
        if(strings == null || strings.isEmpty()){
            return;
        }

        Set<String> collect = strings.stream().filter(userId1 -> !StringUtils.equals(userId1, userId)).collect(Collectors.toSet());

        //对同纪念馆的在线用户进行广播
        WebsocketUtil.sendMessageForUsers(collect,message);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        try {
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        throwable.printStackTrace();
    }
}

3.网关允许WebScoket

- id: ruoyi-foundationWebSocket
    uri: lb:ws://ruoyi-foundation
    predicates:
        - Path=/foundationWebSocket/**
    filters:
        - StripPrefix=1

4.测试

5.线上nginx配置


    location /mmwzGateWay/ {
        if ($request_method = OPTIONS) {
            add_header Access-Control-Allow-Origin $http_origin;
            add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, Content-Type, Accept";
            add_header Access-Control-Allow-Methods GET,POST,OPTIONS,HEAD,PUT,DELETE;
            add_header Access-Control-Allow-Credentials true;
            return 200;
        }
 
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;

	   # WebSocket 相关的头部配置
	   proxy_http_version 1.1;
	   proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Forwarded-Proto $scheme;
        
        proxy_pass http://mmwz-gateway:8080/;
        #proxy_pass http://www.baidu.com/;
    }


http://www.kler.cn/a/542501.html

相关文章:

  • vue-点击生成动态值,动态渲染回显输入框
  • 深入解析 Kafka 消费者偏移量管理
  • 代码随想录day11
  • C++17 新特性解析
  • 微服务..
  • C语言学习笔记:子函数的调用实现各个位的累加和
  • SpringCloud - Gateway 网关
  • 常用电路(过压保护、电流/电压采集)
  • 开源AI智能名片2+1链动模式S2B2C商城小程序在实体店与线上营销中的应用探索
  • 教程 | MySQL 基本指令指南(附MySQL软件包)
  • 最新PHP盲盒商城系统源码 晒图+免签+短信验证+在线回收 ThinkPHP框架
  • MySQL——CRUD
  • Java爬虫:高效获取1688商品详情的“数字猎人”
  • 林语堂 | 生活的智慧在于逐渐澄清滤除那些不重要的杂质,而保留最重要的部分
  • AH比价格策略源代码
  • HALCON 数据结构
  • Vision Transformer:打破CNN垄断,全局注意力机制重塑计算机视觉范式
  • 青少年编程与数学 02-009 Django 5 Web 编程 04课题、应用创建
  • 本地部署的drawio绘图存储调研
  • 数据结构--迷宫问题
  • 在nodejs中使用RabbitMQ(三)Routing、Topics、Headers
  • Flink-DataStream API
  • Redis Sentinel(哨兵)模式介绍
  • 力扣动态规划-26【算法学习day.120】
  • DeepSeek API 调用 - Spring Boot 实现
  • 【经验分享】Linux 系统安装后内核参数优化