当前位置: 首页 > article >正文

redis 的发布订阅解决分布式下的websocket session 共享问题

1.项目背景 :使用websocket 和前端通信但是后端是分布式部署,session 无法共享,并且websocketSession 无法序列化,所以无法把session存到redis 中。

2.基础代码:

@Configuration
@Slf4j
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_CHANNEL));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        return new MessageListenerAdapter(receiver, "onMessage");
    }


}
@Slf4j
@Component
public class RedisReceiver implements MessageListener {

    @Autowired
    private TestHandler testHandler;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            log.info("RedisReceiver onMessage:{}",new String(message.getBody(), StandardCharsets.UTF_8));
            String channel = new String(message.getChannel());
            String content = new String(message.getBody(), StandardCharsets.UTF_8);
            if (Constants.REDIS_CHANNEL.endsWith(channel)) {
                testHandler.sendMessageToClient(content);
            }
        } catch (Exception e) {
            log.error("RedisReceiver 消息处理失败",e);
        }
    }

}
@Slf4j
@Component
public class TestHandler extends TextWebSocketHandler {

    private static final ConcurrentHashMap<String, WebSocketSession> SESSION_DISPATCH_HOLDER = new ConcurrentHashMap<>();


    @Resource
    private StringRedisTemplate stringRedisTemplate;



    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
        SESSION_DISPATCH_HOLDER.put(session.getId(), session);
        log.info("webSocket already connected:{}",session.getId());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
        SESSION_DISPATCH_HOLDER.remove(session.getId());
        log.info("webSocket disConnect:{}",session.getId());
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
        log.info("handleMessage enter message:{}", JsonUtils.objToJsonStr(message.getPayload()));
        TextMessage textMessage = new TextMessage("");
        try {
            session.sendMessage(textMessage);
        } catch (IOException e) {
            log.error("发送消息失败", e);
        }
    }

 

  
    public void sendMessageToClient(String contentList) {
        log.info("sendMessageToClient:{}", contentList);
        if (CollectionUtils.isEmpty(SESSION_DISPATCH_HOLDER)) {
            log.info("没有连接中的客户端");
            return;
        }
        SESSION_DISPATCH_HOLDER.values().parallelStream().forEach(session -> {
            try {
                TextMessage textMessage = new TextMessage(contentList);
                session.sendMessage(textMessage);
                log.info("websocket 消息推送成功:{}", contentList);
            } catch (Exception e) {
                log.error("发送消息失败", e);
            }
        });
    }



}


http://www.kler.cn/a/325715.html

相关文章:

  • KF UKF
  • 详解Rust的数据类型和语法
  • 基于STM32的智能语音识别饮水机系统设计
  • python高级之简单爬虫实现
  • Spark分布式计算中Shuffle Read 和 Shuffle Write的职责和区别
  • Linux下编译MFEM
  • 代码随想录算法训练营Day15
  • 【面试题】软件测试实习(含答案)
  • 828华为云征文|针对Flexus X实例云服务器的CPU和内存性能测评
  • Quill Editor 富文本编辑器的高度问题
  • SWAP、AquaCrop、FVCOM、Delft3D、SWAT、R+VIC、HSPF、HEC-HMS......
  • 云计算中过等保三级需要的网络安全设备及详细讲解
  • 可视化大屏
  • CTFshow-SSRF
  • JSP(Java Server Pages)基础使用二
  • 自制CANTool_DBC_Layout仿制_布局读取Signal(三)
  • 【ShuQiHere】AVL 树(AVL Tree):如何保持二叉搜索树的平衡性?
  • 重构长方法之提取方法
  • 9.26-9.29学习
  • 信息安全数学基础(21)高次同余式的解数及解法
  • 【C++题目】7.双指针_和为 s 的两个数字
  • Python | Leetcode Python题解之第447题回旋镖的数量
  • 【linux 多进程并发】linux进程状态与生命周期各阶段转换,进程状态查看分析,助力高性能优化
  • 【C++——文件操作】
  • Allen Institute for Artificial Intelligence (Ai2) 发布开源多模态语言模型 Molmo
  • Mixture-of-Experts (MoE): 条件计算的诞生与崛起【下篇】