当前位置: 首页 > article >正文

即时通讯代码优化

在线用户逻辑修复

在进行测试时,发现当前代码有个问题,如果test1在服务器进行连接,本地的test2给test1发消息,虽然test1能收到服务器上的信息,但是本地服务日志中会报teset1不在线,需要对该种情况进行修复

修复方案:使用redis存储在线用户

  • WebSocketEndpoint

利用setBit记录登录用户,key为用户名的hashcode,即便有可能冲突,但是概率较小,可以接受。

package com.example.im.endpoint;

import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author PC
 */
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);

    public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();

    /**
     * redis中用户SessionMap
     */
    public static final String ONLINE_USER = "cus:ws:online-user";

    private Session session;
    private static WebSocketMessageService webSocketMessageService;

    private static RedisTemplate<String, String> redisTemplate;

    @Autowired
    public void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {
        WebSocketEndpoint.webSocketMessageService = webSocketMessageService;
    }

    @Autowired
    public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        WebSocketEndpoint.redisTemplate = redisTemplate;
    }

    /**
     * 打开ws连接
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        //连接成功
        String userName = getUserName(session);
        logger.info("The connection is successful:" + getUserName(session));
        this.session = session;
        //是有hash冲突的可能性的,不过触发概率很低,可以忽略
        int hashCode = userName.hashCode();
        redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);
        WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
    }

    /**
     * 断开ws连接
     *
     * @param session 会话
     */
    @OnClose
    public void onClose(Session session) {
        String userName = getUserName(session);
        int hashCode = userName.hashCode();
        redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);
        WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);
        //断开连接
        logger.info("Disconnect:" + userName);
    }

    /**
     * 接收到的消息
     *
     * @param message 消息内容
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        //接收消息
        String sendUserName = getUserName(session);
        webSocketMessageService.sendMessage(sendUserName, message);
    }

    private String getUserName(Session session) {
        return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>())
                .stream().findFirst().orElse("anonymous_users");
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}
  • DefaultSendExecutor

适配改变类型后的WEB_SOCKET_ENDPOINT_MAP,并调整代码结构

package com.example.im.endpoint;

import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author PC
 */
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);

    public static final ConcurrentHashMap<Integer, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();

    /**
     * redis中用户SessionMap
     */
    public static final String ONLINE_USER = "cus:ws:online-user";

    private Session session;
    private static WebSocketMessageService webSocketMessageService;

    private static RedisTemplate<String, String> redisTemplate;

    @Autowired
    public void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {
        WebSocketEndpoint.webSocketMessageService = webSocketMessageService;
    }

    @Autowired
    public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        WebSocketEndpoint.redisTemplate = redisTemplate;
    }

    /**
     * 打开ws连接
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        //连接成功
        String userName = getUserName(session);
        logger.info("The connection is successful:" + getUserName(session));
        this.session = session;
        //是有hash冲突的可能性的,不过触发概率很低,可以忽略
        int hashCode = userName.hashCode();
        redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, true);
        WEB_SOCKET_ENDPOINT_MAP.put(hashCode, this);
    }

    /**
     * 断开ws连接
     *
     * @param session 会话
     */
    @OnClose
    public void onClose(Session session) {
        String userName = getUserName(session);
        int hashCode = userName.hashCode();
        redisTemplate.opsForValue().setBit(ONLINE_USER, hashCode, false);
        WEB_SOCKET_ENDPOINT_MAP.remove(hashCode);
        //断开连接
        logger.info("Disconnect:" + userName);
    }

    /**
     * 接收到的消息
     *
     * @param message 消息内容
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        //接收消息
        String sendUserName = getUserName(session);
        webSocketMessageService.sendMessage(sendUserName, message);
    }

    private String getUserName(Session session) {
        return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>())
                .stream().findFirst().orElse("anonymous_users");
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}

重复代码提取

在增加了stream和redis渠道后,消息监听处的代码有大段重复,可以进行提取处理

  • MessageInfoUtil
package com.example.im.infra.executor.send.util;

import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @author PC
 * MessageInfo工具类
 */
@Component
public class MessageInfoUtil {
    private final static Logger logger = LoggerFactory.getLogger(MessageInfoUtil.class);
    private DefaultSendExecutor defaultSendExecutor;

    @Autowired
    public void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {
        this.defaultSendExecutor = defaultSendExecutor;
    }

    public MessageInfo sendMessageByMessageInfoByte(byte[] messageInfoByte) {
        String messageJson = new String(messageInfoByte, StandardCharsets.UTF_8);
        MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {
        });
        switch (messageInfo.getScopeOfSending()) {
            case USER:
                defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());
                break;
            case ALL:
                defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());
                break;
            default:
                //一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子
                logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());
                break;
        }
        return messageInfo;
    }
}
  • RedisMessageListener
package com.example.im.infra.executor.send.redis;

import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * @author PC
 * redis监听
 */
@Component
public class RedisMessageListener implements MessageListener {

    private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);

    private MessageInfoUtil messageInfoUtil;

    @Autowired
    public void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {
        this.messageInfoUtil = messageInfoUtil;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        logger.debug("send redis info");
        messageInfoUtil.sendMessageByMessageInfoByte(message.getBody());
    }
}
  • StreamMessageListener
package com.example.im.infra.executor.send.stream;

import com.example.im.infra.executor.send.util.MessageInfoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Function;

/**
 * @author PC
 * 消息队列监听
 */
@Component
public class StreamMessageListener {
    private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);

    private MessageInfoUtil messageInfoUtil;

    @Autowired
    public void setMessageInfoUtil(MessageInfoUtil messageInfoUtil) {
        this.messageInfoUtil = messageInfoUtil;
    }

    @Bean
    public Function<Flux<Message<byte[]>>, Mono<Void>> listener() {
        logger.debug("send stream info");
        return messageInfoFlux -> messageInfoFlux.map(message -> messageInfoUtil.sendMessageByMessageInfoByte(message.getPayload())).then();
    }
}

未用到的bean不加载

当未用到某些渠道时,无需进行相关配置,如使用了redis渠道,yml中就无需配置kafka信息

Redis渠道

调整config文件,当渠道非redis时对redis渠道相关bean不进行加载

  • WebSocketConfig
package com.example.im.config;

import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.annotation.Resource;

/**
 * @author PC
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Resource
    private WebSocketProperties webSocketProperties;

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }

    /***
     * 配置线程池
     * @return 线程池
     */
    @Bean
    public TaskExecutor taskExecutor() {
        WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(executorProperties.getCorePoolSize());
        // 设置最大线程数
        executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
        // 设置队列容量
        executor.setQueueCapacity(executorProperties.getQueueCapacity());
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
        // 设置默认线程名称
        executor.setThreadNamePrefix("im-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}
  • RedisConfiguration

缓存用到了redis,RedisTemplate需要加载

package com.example.im.config;

import com.example.im.infra.executor.send.redis.RedisMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author PC
 */
@Configuration
public class RedisConfiguration {

    /**
     * redisTemplate配置
     *
     * @return RedisTemplate
     */
    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        redisTemplate.setStringSerializer(stringRedisSerializer);
        redisTemplate.setDefaultSerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(stringRedisSerializer);
        redisTemplate.setConnectionFactory(connectionFactory);
        return redisTemplate;
    }

    @Bean
    @ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 订阅一个或多个频道
        container.addMessageListener(listenerAdapter, new PatternTopic("redis-websocket-*"));

        return container;
    }

    @Bean
    @ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
    MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {
        return new MessageListenerAdapter(redisMessageListener);
    }
}
  • RedisMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisMessageListener implements MessageListener 
  • RedisSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "redis")
public class RedisSendExecutor extends AbstractBaseSendExecutor 

Kafka渠道

  • StreamMessageListener
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamMessageListener 
  • StreamSendExecutor
@Component
@ConditionalOnProperty(name = "cus.ws.communication-type", havingValue = "stream")
public class StreamSendExecutor extends AbstractBaseSendExecutor

参考资料

[1].im项目


http://www.kler.cn/news/359262.html

相关文章:

  • Mybatis框架用到了哪些设计模式?
  • LeetCode --- 419周赛
  • 企业信息化
  • 【mod分享】极品飞车10卡本峽谷白日mod,在白天竞速也是一种很棒的体验,更多的车辆,更高清的材质,更棒的灯光效果、同样光追
  • 【AI战略思考6】高筑墙,广积粮,静待周文王
  • LeetCode:3258.统计满足k约束的子串数量 I(滑动窗口 Java)
  • 土豆家族提权
  • pytorch安装GPU版本,指定设备
  • 8.扩散模型的未来---GPT及大模型(1)
  • Linux文件与fd
  • BERT的中文问答系统(羲和1.0)
  • 论文翻译 | OpenICL: An Open-Source Framework for In-context Learning
  • 鸿蒙NEXT开发-网络管理(基于最新api12稳定版)
  • 对BSV区块链下一代节点Teranode的答疑解惑(下篇)
  • 提升SQL技能,掌握数据分析
  • “敌人”追击状态的转换
  • 蒙特卡洛法面波频散曲线反演(matlab)
  • 【机器学习基础】全连接层
  • 【HarmonyOS NEXT】实现保存base64图片到图库
  • wordcloud 字体报错