SpringBoot整合WebSocket实现消息推送或聊天功能示例
最近在做一个功能,就是需要实时给用户推送消息,所以就需要用到 websocket
springboot 接入 websocket 非常简单,只需要下面几个配置即可
pom 文件
<!-- spring-boot-web启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- lombok插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
application.yml
server:
port: 1001
logging:
config: classpath:logback-spring.xml
spring:
profiles:
active: dev
config 文件,我之前参考的别人的博客就是没有这个,导致怎么都请求不了,这个要注意
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Sakura
* @date 2024/9/13 14:02
* 开启websocket支持
*/
@Configuration
public class WebSocketConfig {
// 使用boot内置tomcat时需要注入此bean
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
工具类
import lombok.extern.log4j.Log4j2;
import javax.websocket.Session;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Sakura
* @date 2024/9/13 11:40
*/
@Log4j2
public class WebsocketUtil {
private static final Map<String, Session> ONLINE_SESSION = new ConcurrentHashMap<>();
/**
* 添加session
*/
public static void addSession(String userId, Session session){
// 一个用户只允许一个session链接
ONLINE_SESSION.putIfAbsent(userId, session);
log.info("User [{}] connected. Total online users: {}", userId, ONLINE_SESSION.size());
}
/**
* 移除session
*/
public static void removeSession(String userId){
ONLINE_SESSION.remove(userId);
log.info("User [{}] disconnected. Total online users: {}", userId, ONLINE_SESSION.size());
}
/**
* 给单个用户推送消息
*/
public static void sendMessage(String userId, String message){
Session session = ONLINE_SESSION.get(userId);
if(session == null){
log.warn("Session for user [{}] not found", userId);
return;
}
sendMessage(session, message);
}
public static void sendMessage(Session session, String message) {
if (session != null) {
session.getAsyncRemote().sendText(message);
}
}
/**
* 给所有用户发消息
*/
public static void sendMessageForAll(String message) {
ONLINE_SESSION.forEach((userId, session) -> {
CompletableFuture.runAsync(() -> sendMessage(session, message))
.exceptionally(ex -> {
log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
return null;
});
});
}
/**
* 给指定的多个用户推送消息
*/
public static void sendMessageForUsers(Set<String> userIds, String message) {
userIds.forEach(userId -> {
Session session = ONLINE_SESSION.get(userId);
if (session == null) {
log.warn("Session for user [{}] not found", userId);
return;
}
CompletableFuture.runAsync(() -> sendMessage(session, message))
.exceptionally(ex -> {
log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
return null;
});
});
}
}
WebsocketController
import com.yike.websocket.util.WebsocketUtil;
import lombok.extern.java.Log;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
/**
* @author Sakura
* @date 2024/9/13 11:41
*/
@Component
@ServerEndpoint(value = "/chat/{userId}")
@Log
public class WebsocketController {
/**
* 连接事件,加入注解
* @param userId
* @param session
*/
@OnOpen
public void onOpen(@PathParam(value = "userId") String userId, Session session) {
log.info("WebSocket连接成功,用户ID: " + userId);
// 添加到session的映射关系中
WebsocketUtil.addSession(userId, session);
// 广播通知,某用户上线了
// WebsocketUtil.sendMessageForAll(message);
}
/**
* 连接事件,加入注解
* 用户断开链接
*
* @param userId
* @param session
*/
@OnClose
public void onClose(@PathParam(value = "userId") String userId, Session session) {
log.info("WebSocket连接断开,用户ID: " + userId);
// 删除映射关系
WebsocketUtil.removeSession(userId);
// 广播通知,用户下线了
// WebsocketUtil.sendMessageForAll(message);
}
/**
* 当接收到用户上传的消息
*
* @param userId
* @param session
*/
@OnMessage
public void onMessage(@PathParam(value = "userId") String userId, Session session, String message) {
log.info("用户ID: " + userId + " 发送消息: " + message);
// 直接广播
// WebsocketUtil.sendMessageForAll(msg);
}
/**
* 处理用户活连接异常
*
* @param session
* @param throwable
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.info("用户异常断开链接,原因: " + throwable.getMessage());
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
}
throwable.printStackTrace();
}
}
然后我们加一个测试用的发消息接口
import com.yike.websocket.util.WebsocketUtil;
import lombok.extern.java.Log;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Set;
/**
* @author Sakura
* @date 2024/9/13 13:38
*/
@RestController
@RequestMapping("/msg")
@Log
public class MsgController {
@PostMapping("/send")
public void send(@RequestParam("id") String id, @RequestParam("message") String message) {
log.info("发送消息给:" + id + "-" + message);
WebsocketUtil.sendMessage(id, message);
}
@PostMapping("/sendAll")
public void sendAll(@RequestParam("message") String message) {
log.info("群发消息:" + message);
WebsocketUtil.sendMessageForAll(message);
}
@PostMapping("/sendUserList")
public void sendAll(@RequestParam("userIds") Set<String> userIds, @RequestParam("message") String message) {
log.info("发送多人消息:" + userIds.toString() + message);
WebsocketUtil.sendMessageForAll(message);
}
}
接下来我们开始测试,这里我用的 apipost,听说 postman 也可以连接 websocket 但是我试了下没找到在哪里
在 apipost 里面选择新建Websocket就可以了
刚才我们在 WebsocketController 里面写的地址是 /chat/{userId}
所以这里就填 ws://127.0.0.1:1000/chat/123, 后面那个123就是用户唯一标识,通常情况就是客户端登录成功后拿到用户ID了然后再通过这个ID来建立 websocket 连接
我们点击那个连接,可以看到提示连接成功了
看一下控制台,这里显示两个用户是因为我建立了两个连接方面后面测试,大家也是一样,换一下后面的 userId 就可以
下面我们通过这几个接口测试给客户端发消息
首先是给单个用户发消息,我们给用户 123 发消息,这里用 JSON 字符串是为了分辨不同的消息类型,让客户端知道要做什么,大家可以随便定义,反正就是个字符串类型
可以看到控制台提示发送成功了
我们去看下 123 的控制台,可以看到拿到消息了
然后群发消息 “大家好”
看下一 456,收到了
123 也收到了
给多个用户发消息的也是一样的
我这里因为只是服务端给客户端推送消息,如果大家要做聊天功能的话就需要自己通过这三个接口来写业务逻辑实现
客户端发消息到服务端
看一下控制台可以看到已经收到消息了
大家要是想做聊天工具的话就需要在下面这个接口里面加逻辑,比如客户端发 JSON 格式的字符串,然后里面指定好用户和消息内容这些就可以,当然大家最好做好认证这些,确保消息是用户自己发出的
普通的 springboot 项目上面那些就够了,但是因为我的项目是 springcloud 的,我这边想的是把这个服务单独出来,然后其它服务通过 openfeign 来调这个服务给客户端推送消息,所以这里面就整合了nacos 和 gateway,然后前端通过公共的域名来访问,比如 wss://www.sakura.com/api-websocket/chat/2(注意:因为我域名是HTTPS协议的,所以连接的时候要用 wss)
好了接下来我只说重点的配置,其它的不动的大家不懂可以直接问我即可
首先是启动文件,因为我这是个单独的服务,所以不需要连接数据库这些,所以要加上 exclude= {DataSourceAutoConfiguration.class}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
/**
* @author Sakura
* @date 2024/9/13 11:43
*/
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class WebSocketApplication {
public static void main(String[] args) {
SpringApplication.run(WebSocketApplication.class, args);
}
}
然后就是gateway的路由配置
feign 接口大家根据自己的项目写就好
最后就是域名的 nginx 配置,我们在里面加上这个
location /api-websocket/ {
proxy_pass http://localhost:1001/;
# WebSocket 相关的头部配置
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 避免 WebSocket 超时断开
proxy_read_timeout 3600;
proxy_send_timeout 3600;
}
然后就可以连接了