spring cloud 使用 webSocket
1.引入依赖,(在微服务模块中)
<!-- Spring WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.新建文件
package com.ruoyi.foundation.webSocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 开启WebSocket支持
*/
@Configuration
public class WebSocketConfig {
// 使用boot内置tomcat时需要注入此bean
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package com.ruoyi.foundation.webSocket;
import lombok.extern.slf4j.Slf4j;
import javax.websocket.Session;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class WebsocketUtil {
private static final Map<String, Session> ONLINE_SESSION = new ConcurrentHashMap<>();
/**
* 添加session
*/
public static void addSession(String userId, Session session){
// 一个用户只允许一个session链接
ONLINE_SESSION.putIfAbsent(userId, session);
log.info("User [{}] connected. Total online users: {}", userId, ONLINE_SESSION.size());
}
/**
* 移除session
*/
public static void removeSession(String userId){
ONLINE_SESSION.remove(userId);
log.info("User [{}] disconnected. Total online users: {}", userId, ONLINE_SESSION.size());
}
/**
* 给单个用户推送消息
*/
public static void sendMessage(String userId, String message){
Session session = ONLINE_SESSION.get(userId);
if(session == null){
log.warn("Session for user [{}] not found", userId);
return;
}
sendMessage(session, message);
}
public static void sendMessage(Session session, String message) {
if (session != null) {
session.getAsyncRemote().sendText(message);
}
}
/**
* 给所有用户发消息
*/
public static void sendMessageForAll(String message) {
ONLINE_SESSION.forEach((userId, session) -> {
CompletableFuture.runAsync(() -> sendMessage(session, message))
.exceptionally(ex -> {
log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
return null;
});
});
}
/**
* 给指定的多个用户推送消息
*/
public static void sendMessageForUsers(Set<String> userIds, String message) {
userIds.forEach(userId -> {
Session session = ONLINE_SESSION.get(userId);
if (session == null) {
log.warn("Session for user [{}] not found", userId);
return;
}
CompletableFuture.runAsync(() -> sendMessage(session, message))
.exceptionally(ex -> {
log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
return null;
});
});
}
}
package com.ruoyi.foundation.apicontroller;
import com.google.gson.Gson;
import com.ruoyi.foundation.apicontroller.req.MemorialHallWebsocketReq;
import com.ruoyi.foundation.webSocket.WebsocketUtil;
import io.seata.common.util.StringUtils;
import org.apache.poi.util.StringUtil;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Component
@ServerEndpoint(value = "/api/memorialHallWebsocket/{dailyMissId}/{userId}")
public class MmMemorialHallWebsocketController {
/**
* 保存每日一念纪念馆中当前在线的用户ID
*/
private static final Map<String, List<String>> memorialHallUsers = new ConcurrentHashMap<>();
private Gson gson=new Gson();
@OnOpen
public void onOpen(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {
WebsocketUtil.addSession(userId, session);
List<String> strings = memorialHallUsers.get(dailyMissId);
if (strings == null){
List<String> list=new ArrayList<>();
list.add(userId);
memorialHallUsers.put(dailyMissId,list);
}else{
strings.add(userId);
}
}
@OnClose
public void onClose(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {
WebsocketUtil.removeSession(userId);
List<String> strings = memorialHallUsers.get(dailyMissId);
if(strings != null){
strings.remove(userId);
}
}
@OnMessage
public void onMessage(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session, String message) {
/*System.out.println(dailyMissId);
System.out.println(userId);
System.out.println(session);
System.out.println(message);*/
//MemorialHallWebsocketReq memorialHallWebsocketReq = gson.fromJson(message, MemorialHallWebsocketReq.class);
List<String> strings = memorialHallUsers.get(dailyMissId);
if(strings == null || strings.isEmpty()){
return;
}
Set<String> collect = strings.stream().filter(userId1 -> !StringUtils.equals(userId1, userId)).collect(Collectors.toSet());
//对同纪念馆的在线用户进行广播
WebsocketUtil.sendMessageForUsers(collect,message);
}
@OnError
public void onError(Session session, Throwable throwable) {
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
}
throwable.printStackTrace();
}
}
3.网关允许WebScoket
- id: ruoyi-foundationWebSocket
uri: lb:ws://ruoyi-foundation
predicates:
- Path=/foundationWebSocket/**
filters:
- StripPrefix=1
4.测试
5.线上nginx配置
location /mmwzGateWay/ {
if ($request_method = OPTIONS) {
add_header Access-Control-Allow-Origin $http_origin;
add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, Content-Type, Accept";
add_header Access-Control-Allow-Methods GET,POST,OPTIONS,HEAD,PUT,DELETE;
add_header Access-Control-Allow-Credentials true;
return 200;
}
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# WebSocket 相关的头部配置
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header X-Forwarded-Proto $scheme;
proxy_pass http://mmwz-gateway:8080/;
#proxy_pass http://www.baidu.com/;
}