当前位置: 首页 > article >正文

SpringBoot整合WebSocket实现消息推送或聊天功能示例

最近在做一个功能,就是需要实时给用户推送消息,所以就需要用到 websocket

springboot 接入 websocket 非常简单,只需要下面几个配置即可
pom 文件

		<!-- spring-boot-web启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring WebSocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <!-- lombok插件 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

application.yml

server:
  port: 1001

logging:
  config: classpath:logback-spring.xml

spring:
  profiles:
    active: dev

config 文件,我之前参考的别人的博客就是没有这个,导致怎么都请求不了,这个要注意

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Sakura
 * @date 2024/9/13 14:02
 * 开启websocket支持
 */
@Configuration
public class WebSocketConfig {

    // 使用boot内置tomcat时需要注入此bean
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

工具类

import lombok.extern.log4j.Log4j2;

import javax.websocket.Session;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Sakura
 * @date 2024/9/13 11:40
 */
@Log4j2
public class WebsocketUtil {

    private static final Map<String, Session> ONLINE_SESSION = new ConcurrentHashMap<>();

    /**
     * 添加session
     */
    public static void addSession(String userId, Session session){
        // 一个用户只允许一个session链接
        ONLINE_SESSION.putIfAbsent(userId, session);
        log.info("User [{}] connected. Total online users: {}", userId, ONLINE_SESSION.size());
    }

    /**
     * 移除session
     */
    public static void removeSession(String userId){
        ONLINE_SESSION.remove(userId);
        log.info("User [{}] disconnected. Total online users: {}", userId, ONLINE_SESSION.size());
    }

    /**
     * 给单个用户推送消息
     */
    public static void sendMessage(String userId, String message){
        Session session = ONLINE_SESSION.get(userId);
        if(session == null){
            log.warn("Session for user [{}] not found", userId);
            return;
        }
        sendMessage(session, message);
    }

    public static void sendMessage(Session session, String message) {
        if (session != null) {
            session.getAsyncRemote().sendText(message);
        }
    }

    /**
     * 给所有用户发消息
     */
    public static void sendMessageForAll(String message) {
        ONLINE_SESSION.forEach((userId, session) -> {
            CompletableFuture.runAsync(() -> sendMessage(session, message))
                    .exceptionally(ex -> {
                        log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
                        return null;
                    });
        });
    }

    /**
     * 给指定的多个用户推送消息
     */
    public static void sendMessageForUsers(Set<String> userIds, String message) {
        userIds.forEach(userId -> {
            Session session = ONLINE_SESSION.get(userId);
            if (session == null) {
                log.warn("Session for user [{}] not found", userId);
                return;
            }
            CompletableFuture.runAsync(() -> sendMessage(session, message))
                    .exceptionally(ex -> {
                        log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
                        return null;
                    });
        });
    }

}

WebsocketController

import com.yike.websocket.util.WebsocketUtil;
import lombok.extern.java.Log;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

/**
 * @author Sakura
 * @date 2024/9/13 11:41
 */
@Component
@ServerEndpoint(value = "/chat/{userId}")
@Log
public class WebsocketController {

    /**
     * 连接事件,加入注解
     * @param userId
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam(value = "userId") String userId, Session session) {
        log.info("WebSocket连接成功,用户ID: " + userId);
        // 添加到session的映射关系中
        WebsocketUtil.addSession(userId, session);
        // 广播通知,某用户上线了
//        WebsocketUtil.sendMessageForAll(message);
    }

    /**
     * 连接事件,加入注解
     * 用户断开链接
     *
     * @param userId
     * @param session
     */
    @OnClose
    public void onClose(@PathParam(value = "userId") String userId, Session session) {
        log.info("WebSocket连接断开,用户ID: " + userId);
        // 删除映射关系
        WebsocketUtil.removeSession(userId);
        // 广播通知,用户下线了
//        WebsocketUtil.sendMessageForAll(message);
    }

    /**
     * 当接收到用户上传的消息
     *
     * @param userId
     * @param session
     */
    @OnMessage
    public void onMessage(@PathParam(value = "userId") String userId, Session session, String message) {
        log.info("用户ID: " + userId + " 发送消息: " + message);
        // 直接广播
//        WebsocketUtil.sendMessageForAll(msg);
    }

    /**
     * 处理用户活连接异常
     *
     * @param session
     * @param throwable
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        log.info("用户异常断开链接,原因: " + throwable.getMessage());
        try {
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        throwable.printStackTrace();
    }
}

然后我们加一个测试用的发消息接口

import com.yike.websocket.util.WebsocketUtil;
import lombok.extern.java.Log;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Set;

/**
 * @author Sakura
 * @date 2024/9/13 13:38
 */
@RestController
@RequestMapping("/msg")
@Log
public class MsgController {

    @PostMapping("/send")
    public void send(@RequestParam("id") String id, @RequestParam("message") String message) {
        log.info("发送消息给:" + id + "-" + message);
        WebsocketUtil.sendMessage(id, message);
    }

    @PostMapping("/sendAll")
    public void sendAll(@RequestParam("message") String message) {
        log.info("群发消息:" + message);
        WebsocketUtil.sendMessageForAll(message);
    }

    @PostMapping("/sendUserList")
    public void sendAll(@RequestParam("userIds") Set<String> userIds, @RequestParam("message") String message) {
        log.info("发送多人消息:" + userIds.toString() + message);
        WebsocketUtil.sendMessageForAll(message);
    }
}

接下来我们开始测试,这里我用的 apipost,听说 postman 也可以连接 websocket 但是我试了下没找到在哪里

在 apipost 里面选择新建Websocket就可以了

在这里插入图片描述

刚才我们在 WebsocketController 里面写的地址是 /chat/{userId}

在这里插入图片描述

所以这里就填 ws://127.0.0.1:1000/chat/123, 后面那个123就是用户唯一标识,通常情况就是客户端登录成功后拿到用户ID了然后再通过这个ID来建立 websocket 连接

我们点击那个连接,可以看到提示连接成功了

在这里插入图片描述

看一下控制台,这里显示两个用户是因为我建立了两个连接方面后面测试,大家也是一样,换一下后面的 userId 就可以

在这里插入图片描述

下面我们通过这几个接口测试给客户端发消息

在这里插入图片描述

首先是给单个用户发消息,我们给用户 123 发消息,这里用 JSON 字符串是为了分辨不同的消息类型,让客户端知道要做什么,大家可以随便定义,反正就是个字符串类型

在这里插入图片描述

可以看到控制台提示发送成功了

在这里插入图片描述

我们去看下 123 的控制台,可以看到拿到消息了

在这里插入图片描述

然后群发消息 “大家好”

在这里插入图片描述

看下一 456,收到了

在这里插入图片描述
123 也收到了
在这里插入图片描述

给多个用户发消息的也是一样的

在这里插入图片描述

我这里因为只是服务端给客户端推送消息,如果大家要做聊天功能的话就需要自己通过这三个接口来写业务逻辑实现

客户端发消息到服务端

在这里插入图片描述

看一下控制台可以看到已经收到消息了

在这里插入图片描述

大家要是想做聊天工具的话就需要在下面这个接口里面加逻辑,比如客户端发 JSON 格式的字符串,然后里面指定好用户和消息内容这些就可以,当然大家最好做好认证这些,确保消息是用户自己发出的

在这里插入图片描述

普通的 springboot 项目上面那些就够了,但是因为我的项目是 springcloud 的,我这边想的是把这个服务单独出来,然后其它服务通过 openfeign 来调这个服务给客户端推送消息,所以这里面就整合了nacos 和 gateway,然后前端通过公共的域名来访问,比如 wss://www.sakura.com/api-websocket/chat/2(注意:因为我域名是HTTPS协议的,所以连接的时候要用 wss)

好了接下来我只说重点的配置,其它的不动的大家不懂可以直接问我即可

首先是启动文件,因为我这是个单独的服务,所以不需要连接数据库这些,所以要加上 exclude= {DataSourceAutoConfiguration.class}

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

/**
 * @author Sakura
 * @date 2024/9/13 11:43
 */
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class WebSocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSocketApplication.class, args);
    }

}

然后就是gateway的路由配置

在这里插入图片描述

feign 接口大家根据自己的项目写就好

在这里插入图片描述

最后就是域名的 nginx 配置,我们在里面加上这个

location /api-websocket/ {
        proxy_pass http://localhost:1001/;

        # WebSocket 相关的头部配置
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 避免 WebSocket 超时断开
        proxy_read_timeout 3600;
        proxy_send_timeout 3600;
    }

然后就可以连接了

在这里插入图片描述


http://www.kler.cn/a/309732.html

相关文章:

  • 现代Web开发:Vue 3 组件化开发实战
  • 云计算:定义、类型及对企业的影响
  • Go语言中的`io.Pipe`:实现进程间通信的利器
  • 【量化交易笔记】14.模拟盘效果
  • 如何在 CentOS 6 上设置 NFS 挂载
  • linux rocky 9.4部署和管理docker harbor私有源
  • 从头开始学MyBatis—02基于xml和注解分别实现的增删改查
  • (java+Seleniums3)自动化测试实战2
  • ssh远程连接try1账号切换tips
  • 宝兰德MCP系列介绍 ①:中间件管理能力全线升级,驱动企业数字化管理效能提升
  • node.js 完全卸载和下载配置
  • 五种数据库特性对比(Redis/Mysql/SQLite/ES/MongoDB)
  • 780nm 扫地机器人模组:科技引领智能清洁新潮流
  • windows使用tcpdump.exe工具进行抓包教程
  • 企业如何构建有效的数据泄露防护安全体系
  • [yotroy.cool] MGT 388 - Finance for Engineers - notes 笔记
  • kubelet组件的启动流程源码分析
  • Openlayers 报错记录 Failed to execute ‘toDataURL‘ on ‘HTMLCanvasElement‘:
  • 三、(JS)JS中常见的表单事件
  • Nodejs+vue+Express游戏分享网站的设计与实现 7a2s2
  • linux 中gitee配置
  • 如何进行DAP-seq的数据挖掘,筛选验证位点
  • 【网络安全】-rce漏洞-pikachu
  • P2P应用
  • 禹神3小时快速上手typescript
  • 区块链审计 如何测试solidity的bool值占用几个字节