即时通讯代码优化
在线用户逻辑修复
在进行测试时,发现当前代码有个问题,如果test1在服务器进行连接,本地的test2给test1发消息,虽然test1能收到服务器上的信息,但是本地服务日志中会报teset1不在线,需要对该种情况进行修复
修复方案:使用redis存储在线用户
- WebSocketEndpoint
利用setBit记录登录用户,key为用户名的hashcode,即便有可能冲突,但是概率较小,可以接受。
package com.example.im.endpoint;
import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author PC
*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {
private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);
public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();
/**
* redis中用户SessionMap
*/
public static final String ONLINE_USER = "cus:ws:online-user";
private Session session;
private static WebSocketMessageService webSocketMessageService;
private static RedisTemplate<String, String> redisTemplate;
@Autowired
public void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {
WebSocketEndpoint.webSocketMessageService = webSocketMessageService;
}
@Autowired
public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
WebSocketEndpoint.redisTemplate = redisTemplate;
}
/**
* 打开ws连接
*
* @param session 会话
*/
@OnOpen
public void onOpen(Session session) {
//连接成功
String userName = getUserName(session);
logger.info("The connection is successful:" + getUserName(session));
this.session = session;
//是有hash冲突的可能性的,不过触发概率很低,可以忽略
int hashCode = userName.hashCode();
redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);
WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
}
/**
* 断开ws连接
*
* @param session 会话
*/
@OnClose
public void onClose(Session session) {
String userName = getUserName(session);
int hashCode = userName.hashCode();
redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);
WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);
//断开连接
logger.info("Disconnect:" + userName);
}
/**
* 接收到的消息
*
* @param message 消息内容
*/
@OnMessage
public void onMessage(String message, Session session) {
//接收消息
String sendUserName = getUserName(session);
webSocketMessageService.sendMessage(sendUserName, message);
}
private String getUserName(Session session) {
return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>())
.stream().findFirst().orElse("anonymous_users");
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}
- DefaultSendExecutor
适配改变类型后的WEB_SOCKET_ENDPOINT_MAP,并调整代码结构
package com.example.im.endpoint;
import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author PC
*/
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {
private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);
public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();
/**
* redis中用户SessionMap
*/
public static final String ONLINE_USER = "cus:ws:online-user";
private Session session;
private static WebSocketMessageService webSocketMessageService;
private static RedisTemplate<String, String> redisTemplate;
@Autowired
public void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {
WebSocketEndpoint.webSocketMessageService = webSocketMessageService;
}
@Autowired
public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
WebSocketEndpoint.redisTemplate = redisTemplate;
}
/**
* 打开ws连接
*
* @param session 会话
*/
@OnOpen
public void onOpen(Session session) {
//连接成功
String userName = getUserName(session);
logger.info("The connection is successful:" + getUserName(session));
this.session = session;
//是有hash冲突的可能性的,不过触发概率很低,可以忽略
int hashCode = userName.hashCode();
redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);
WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
}
/**
* 断开ws连接
*
* @param session 会话
*/
@OnClose
public void onClose(Session session) {
String userName = getUserName(session);
int hashCode = userName.hashCode();
redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);
WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);
//断开连接
logger.info("Disconnect:" + userName);
}
/**
* 接收到的消息
*
* @param message 消息内容
*/
@OnMessage
public void onMessage(String message, Session session) {
//接收消息
String sendUserName = getUserName(session);
webSocketMessageService.sendMessage(sendUserName, message);
}
private String getUserName(Session session) {
return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>())
.stream().findFirst().orElse("anonymous_users");
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}
重复代码提取
在增加了stream和redis渠道后,消息监听处的代码有大段重复,可以进行提取处理
- MessageInfoUtil
package com.example.im.infra.executor.send.util;
import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author PC
* MessageInfo工具类
*/
@Component
public class MessageInfoUtil {
private final static Logger logger = LoggerFactory.getLogger(MessageInfoUtil.class);
private DefaultSendExecutor defaultSendExecutor;
@Autowired
public void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {
this.defaultSendExecutor = defaultSendExecutor;
}
public MessageInfo sendMessageByMessageInfoByte(byte[] messageInfoByte) {
String messageJson = new String(messageInfoByte, StandardCharsets.UTF_8);
MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {
});
switch (messageInfo.getScopeOfSending()) {
case USER:
defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());
break;
case ALL:
defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());
break;
default:
//一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子
logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());
break;
}
return messageInfo;
}
}
- RedisMessageListener
package com.example.im.infra.executor.send.redis;
import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* @author PC
* redis监听
*/
@Component
public class RedisMessageListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);
private MessageInfoUtil messageInfoUtil;
@Autowired
public void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {
this.messageInfoUtil = messageInfoUtil;
}
@Override
public void onMessage(Message message, byte[] pattern) {
logger.debug("send redis info");
messageInfoUtil.sendMessageByMessageInfoByte(message.getBody());
}
}
- StreamMessageListener
package com.example.im.infra.executor.send.stream;
import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Function;
/**
* @author PC
* 消息队列监听
*/
@Component
public class StreamMessageListener {
private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);
private MessageInfoUtil messageInfoUtil;
@Autowired
public void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {
this.messageInfoUtil = messageInfoUtil;
}
@Bean
public Function<Flux<Message<byte[]>>, Mono<Void>> listener() {
logger.debug("send stream info");
return messageInfoFlux -> messageInfoFlux.map(message -> messageInfoUtil.sendMessageByMessageInfoByte(message.getPayload())).then();
}
}
未用到的bean不加载
当未用到某些渠道时,无需进行相关配置,如使用了redis渠道,yml中就无需配置kafka信息
Redis渠道
调整config文件,当渠道非redis时对redis渠道相关bean不进行加载
- WebSocketConfig
package com.example.im.config;
import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.annotation.Resource;
/**
* @author PC
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Resource
private WebSocketProperties webSocketProperties;
@Bean
public ServerEndpointExporter serverEndpoint() {
return new ServerEndpointExporter();
}
/***
* 配置线程池
* @return 线程池
*/
@Bean
public TaskExecutor taskExecutor() {
WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(executorProperties.getCorePoolSize());
// 设置最大线程数
executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
// 设置队列容量
executor.setQueueCapacity(executorProperties.getQueueCapacity());
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
// 设置默认线程名称
executor.setThreadNamePrefix("im-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
- RedisConfiguration
缓存用到了redis,RedisTemplate需要加载
package com.example.im.config;
import com.example.im.infra.executor.send.redis.RedisMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @author PC
*/
@Configuration
public class RedisConfiguration {
/**
* redisTemplate配置
*
* @return RedisTemplate
*/
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(stringRedisSerializer);
redisTemplate.setStringSerializer(stringRedisSerializer);
redisTemplate.setDefaultSerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(stringRedisSerializer);
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
@Bean
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅一个或多个频道
container.addMessageListener(listenerAdapter, new PatternTopic("redis-websocket-*"));
return container;
}
@Bean
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {
return new MessageListenerAdapter(redisMessageListener);
}
}
- RedisMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisMessageListener implements MessageListener
- RedisSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisSendExecutor extends AbstractBaseSendExecutor
Kafka渠道
- StreamMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamMessageListener
- StreamSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamSendExecutor extends AbstractBaseSendExecutor
参考资料
[1].im项目