当前位置: 首页 > article >正文

Spring定时任务+webSocket实现定时给指定用户发送消息

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

Spring定时任务 + webSocket实现定时给指定用户发送消息:类似于消息中心;
相信有需求的小伙伴读此文章可以有一定的帮助或者思路

逻辑思路

在做这个业务的时候也遇到了很多的坑,但是现在我帮你踩完了。

  • 使用Spring定时任务框架(@Scheduled)和websocket网络通信协议框架来进行定时向指定用户的客户端推送消息;
  • 编写并设置webSocketConfig配置文件(百度上的常规配置即可);
  • 基于@ServerEndpoint(value = “/websocket/{userId}”)注解标记路径并让前端进行连接服务器端
  • SocketServer中都是使用ConcurrentHashMap集合来进行接收和存储
    • 它是Java 5中引入了ConcurrentHashMap这个并发集合类
    • ConcurrentHashMap通过将一个大的数据结构分解为多个小的数据结构,然后对每个小的数据结构加锁来实现线程安全。这种方式称为分段锁。分段锁我记得是有问题的:性能不好(什么原因导致的有时间去查:查询热点数据),在JDK8时弃用后采用CAS和synchronized组合的方式来保证并发安全。
    • Java 8中的ConcurrentHashMap在内部实现上采用了数组+链表+红黑树的数据结构,也使得其具有较好的查找和遍历性能
    • 有兴趣的小伙伴可以去阅读下:一文看懂 jdk8 中的 ConcurrentHashMap,个人感觉写的很不错。
  • 集合clients用于记录连接的客户端(session)、key为生成的uuid;集合conn用于存放clients数据,key为用户唯一标识(id)。同时也充分使用了websocket的生命周期进行资源释放等操作。
  • 自定义方法sendMessageByUserId(Integer userId, String message),基于userId获取conn中的set集合,基于iterator进行集合迭代,使用hasNext()返回的Boolean进行判断是否继续进行循环遍历,.next()获取其中uuid数据(指针移动),通过uuid取出存在clients集合中的session进行发送数据
  • ===========================================以上是WebSocket实现思路
  • 基于@EnableScheduling注解开启定时任务,基于@Scheduled(cron = “……”)注解和cron表达式进行任务执行。
  • 业务逻辑:不用的人业务不同,所以这里我省略掉了我的业务逻辑,有需求的小伙伴可以私信我进一步沟通;
  • 调用websocket的sendMessageUserById()方法来进行返回数据;

代码实现

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.7.0</version>
</dependency>

webSocketServer.java

/**
 * @Project: JavaLaity
 * @Description: 服务端
 */
@Slf4j
@Component
@ServerEndpoint(value = "/websocket/{userId}")
public class WebSocketServer {

    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象,若要实现服务器端与单一客户端通信的话,可以使用Map来存放,其中key可以为用户标识
     */
    private static final CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    // public static ThreadLocal<SysUser> clientUser = new ThreadLocal<>();
    /**
     * 用于记录连接的客户端 (sid,session)
     */
    public static Map<String, Session> clients = new ConcurrentHashMap<>();
    /**
     * 基于userId关联sid(用于解决同一用户id,在多个web端连接的问题)
     */
    public static Map<Integer, Set<String>> conn = new ConcurrentHashMap<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    private String sid = null;


    // 当WebSocket连接建立时,会调用标注有@OnOpen的方法
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") Integer userId) {
        System.out.println("WebSocket opened: " + session.getId());
        this.session = session;
        this.sid = UUID.randomUUID().toString();
        clients.put(this.sid, session);

        Set<String> clientSet = conn.get(userId);
        if (clientSet == null) {
            clientSet = new HashSet<>();
            conn.put(userId, clientSet);
        }
        clientSet.add(this.sid);
        // 加入set中
        webSocketSet.add(this);
    }

    // 给所有当前连接的用户发送消息
    public void sendMessageAll(String message) {
        try {
            if (webSocketSet.size() != 0) {
                for (WebSocketServer item : webSocketSet) {
                    if (item != null) {
                        // 同步锁,解决多线程下发送消息异常关闭
                        // 避免高并发下多处频繁调用sendMessage()方法发送消息而导致的webSocket挂掉,我们在sendMessage这个方法里面加入同步锁,
                        // 锁住session,这样就能保障webSocket有条不絮的向前端推送消息
                        synchronized (item.session) {
                            item.session.getBasicRemote().sendText(message);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据用户ID发送给某一个用户
     */
    public void sendMessageByUserId(Integer userId, String message) {
        if (userId != null) {
            Set<String> clientSet = conn.get(userId);
            if (clientSet != null) {
                Iterator<String> iterator = clientSet.iterator();
                while (iterator.hasNext()) {
                    String sid = iterator.next();
                    Session session = clients.get(sid);
                    if (session != null) {
                        try {
                            session.getBasicRemote().sendText(message);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    // TODO 当浏览器关闭触发该事件
    @OnClose
    public void OnClose(Session session) {
        log.info(this.sid + "断开连接");
        clients.remove(this.sid);
        webSocketSet.remove(this);
    }

    // 当收到客户端(前端)发送的消息时,会调用@OnMessage方法
    @OnMessage
    public void OnMessage(String message, Session session) {
        System.out.println("Received message:" + message);
    }

    // TODO 发生错误时调用
    @OnError
    public void OnError(Session session, Throwable error) {
        System.out.println("发生错误!");
        error.printStackTrace();
    }
}

AideScheduleTask.java

/**
 * @Project: JavaLaity
 * @Description: 实现业务逻辑的位置
 */
@Configuration
public class AideScheduleTask {

    @Autowired
    WebSocketServer webSocketServer;

    @Autowired
    RedisCache redisCache;

    // 测试
    @Scheduled(cron = "0/20 * * * * ?")
    private void testTask() throws Exception {
        // 使用ThreadLocal - 哈哈哈,弃用了,行不通!。
        // TODO: 你需要的业务逻辑
        SysUser sysUser = new SysUser().setId(1)
        if (sysUser.getId() != null) {
            webSocketServer.sendMessageByUserId(sysUser.getId(), "这个是老大可以发:role = ");
        }
    }
}

注意问题

  • 在写Corn表达式时需要注意的点:

    • 外国对于周几的定义和中国是不一样的,中国1-7对应周一到周日,外国的1-7对应的是周日到周六
    • 国内:1-7 MON,TUE,WED,THU,FRI,SAT,SUN
    • 国外:1-7 SUN,MON,TUE,WED,THU,FRI,SAT √
  • 个人认为有性能问题以及数据库压力的大问题,这里本人就只是简单的分享下设计思路,不同业务需求,实现策略不同。也有肯定公司不在乎这种性能问题。

愿每个人都能遵循自己的时钟,做不后悔的选择。我是Laity,正在前行的Laity。


http://www.kler.cn/a/107758.html

相关文章:

  • K8s进阶使用
  • 2024 年 8 个最佳 API 设计工具图文介绍
  • 深度学习之 LSTM
  • linux 下查看程序启动的目录
  • AcWing 300 任务安排1
  • pgsql和mysql的自增主键差异
  • SpringBoot内置工具类之断言Assert的使用与部分解析
  • CVPR2023新作:基于组合空时位移的视频修复
  • Tensorflow2 中模型训练标签顺序和预测结果标签顺序不一致问题解决办法
  • Jmeter调用Python脚本实现参数互相传递的实现
  • leetcode做题笔记204. 计数质数
  • Day13力扣打卡
  • java 读取pdf文件内容
  • 2023年香水行业数据分析:国人用香需求升级,高端香水高速增长
  • 【神印王座】易军献身为林鑫挡箭,万万没想到林鑫太坑,大跌眼镜
  • LLM在text2sql上的应用 | 京东云技术团队
  • Elasticsearch:使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation (四)
  • 听GPT 讲Rust源代码--library/std(7)
  • docker与宿主机共享内存通信
  • css正确的语法
  • 微服务-Feign
  • 决定放弃uniapp开发了,因为它实在是没有taro友好
  • 银河麒麟v10x86或者arm离线安装服务
  • 【Python入门教程】基于OpenCV视频分解成图片+图片组合成视频(视频抽帧组帧)
  • CentOS 使用线程库Pthread 库
  • 美颜SDK集成指南:为应用添加视频美颜功能