redis 的发布订阅解决分布式下的websocket session 共享问题
1.项目背景 :使用websocket 和前端通信但是后端是分布式部署,session 无法共享,并且websocketSession 无法序列化,所以无法把session存到redis 中。
2.基础代码:
@Configuration
@Slf4j
public class RedisConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_CHANNEL));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
}
@Slf4j
@Component
public class RedisReceiver implements MessageListener {
@Autowired
private TestHandler testHandler;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
log.info("RedisReceiver onMessage:{}",new String(message.getBody(), StandardCharsets.UTF_8));
String channel = new String(message.getChannel());
String content = new String(message.getBody(), StandardCharsets.UTF_8);
if (Constants.REDIS_CHANNEL.endsWith(channel)) {
testHandler.sendMessageToClient(content);
}
} catch (Exception e) {
log.error("RedisReceiver 消息处理失败",e);
}
}
}
@Slf4j
@Component
public class TestHandler extends TextWebSocketHandler {
private static final ConcurrentHashMap<String, WebSocketSession> SESSION_DISPATCH_HOLDER = new ConcurrentHashMap<>();
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
SESSION_DISPATCH_HOLDER.put(session.getId(), session);
log.info("webSocket already connected:{}",session.getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
SESSION_DISPATCH_HOLDER.remove(session.getId());
log.info("webSocket disConnect:{}",session.getId());
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
log.info("handleMessage enter message:{}", JsonUtils.objToJsonStr(message.getPayload()));
TextMessage textMessage = new TextMessage("");
try {
session.sendMessage(textMessage);
} catch (IOException e) {
log.error("发送消息失败", e);
}
}
public void sendMessageToClient(String contentList) {
log.info("sendMessageToClient:{}", contentList);
if (CollectionUtils.isEmpty(SESSION_DISPATCH_HOLDER)) {
log.info("没有连接中的客户端");
return;
}
SESSION_DISPATCH_HOLDER.values().parallelStream().forEach(session -> {
try {
TextMessage textMessage = new TextMessage(contentList);
session.sendMessage(textMessage);
log.info("websocket 消息推送成功:{}", contentList);
} catch (Exception e) {
log.error("发送消息失败", e);
}
});
}
}