当前位置: 首页 > article >正文

NebulaGraph学习笔记-SessionPool之getSession

之前在 NebulaGraph学习笔记-自定义SessionPool 这篇文章中实现了自定义的SessionPool,后续在使用中发现经常查询的都是用的同一个Session,以至于某一个节点的负载压力很大。于是看了一下获取Session的方法,发现是顺序遍历,后续可以调整遍历方式来改变。
  • 依赖包还是跟之前的一致
<!-- Client依赖包 -->
<dependency>
    <groupId>com.vesoft</groupId>
    <artifactId>client</artifactId>
    <version>3.8.4</version>
</dependency>
  • SessionPool会在构造函数中进行初始化
/**
 * init the SessionPool
 * this function is moved into SessionPool's constructor, no need to call it manually.
 */
@Deprecated
public boolean init() {
    if (hasInit.get()) {
        return true;
    }

    while (sessionList.size() < minSessionSize) {
        try {
            createSessionObject(SessionState.IDLE);
            idleSessionSize.incrementAndGet();
        } catch (Exception e) {
            log.error("SessionPool init failed. ");
            throw new RuntimeException("create session failed.", e);
        }
    }
    healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
            TimeUnit.SECONDS);
    sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
            TimeUnit.SECONDS);
    hasInit.compareAndSet(false, true);
    return true;
}


/**
 * create a {@link NebulaSession} with specified state
 *
 * @param state {@link SessionState}
 * @return NebulaSession
 */
private NebulaSession createSessionObject(SessionState state)
        throws ClientServerIncompatibleException, AuthFailedException,
        IOErrorException, BindSpaceFailedException {
    SyncConnection connection = new SyncConnection();
    int tryConnect = sessionPoolConfig.getGraphAddressList().size();
    // reconnect with all available address
    while (tryConnect-- > 0) {
        try {
            if (sessionPoolConfig.isEnableSsl()) {
                connection.open(getAddress(), sessionPoolConfig.getTimeout(),
                        sessionPoolConfig.getSslParam(),
                        sessionPoolConfig.isUseHttp2(),
                        sessionPoolConfig.getCustomHeaders());
            } else {
                connection.open(getAddress(), sessionPoolConfig.getTimeout(),
                        sessionPoolConfig.isUseHttp2(),
                        sessionPoolConfig.getCustomHeaders());
            }
            break;
        } catch (Exception e) {
            if (tryConnect == 0 || !reconnect) {
                throw e;
            } else {
                log.warn("connect failed, " + e.getMessage());
            }
        }
    }

    AuthResult authResult;
    try {
        authResult = connection.authenticate(sessionPoolConfig.getUsername(),
                sessionPoolConfig.getPassword());
    } catch (AuthFailedException e) {
        log.error(e.getMessage());
        if (e.getMessage().toLowerCase().contains("user not exist")
                || e.getMessage().toLowerCase().contains("invalid password")) {
            // close the session pool
            close();
        } else {
            // just close the connection
            connection.close();
        }
        throw e;
    }

    NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
            authResult.getTimezoneOffset(), state);
    ResultSet result = null;
    try {
        result = nebulaSession.execute(useSpace);
    } catch (IOErrorException e) {
        log.error("binding space failed,", e);
        nebulaSession.release();
        throw new BindSpaceFailedException("binding space failed:" + e.getMessage());
    }
    if (!result.isSucceeded()) {
        nebulaSession.release();
        throw new BindSpaceFailedException(result.getErrorMessage());
    }
    sessionList.add(nebulaSession);
    return nebulaSession;
}


public HostAddress getAddress() {
    List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
    int newPos = (pos.getAndIncrement()) % addresses.size();
    HostAddress hostAddress = addresses.get(newPos);
    log.info("ng address {} {} {} {}", pos.get(), newPos, hostAddress.getHost(), hostAddress.getPort());
    return hostAddress;
}
  • getSession用于为每次执行查询提供Session,可以看出每次都是从sessionList中顺序取出nebulaSession
/**
 * return an idle session
 */
private synchronized NebulaSession getSession() throws ClientServerIncompatibleException,
        AuthFailedException, IOErrorException, BindSpaceFailedException {
    int retry = sessionPoolConfig.getRetryConnectTimes();
    while (retry-- >= 0) {
        // if there are idle sessions, get session from queue
        if (idleSessionSize.get() > 0) {
             for (NebulaSession nebulaSession : sessionList) {
                 if (nebulaSession.isIdleAndSetUsed()) {
                     int currentIdleSessionSize = idleSessionSize.decrementAndGet();
                     log.info("ng session {} {}", currentIdleSessionSize, nebulaSession.getSessionID());
                     return nebulaSession;
                 }
             }
        }
        // if session size is less than max size, get session from pool
        if (sessionList.size() < maxSessionSize) {
            return createSessionObject(SessionState.USED);
        }
        // there's no available session, wait for SessionPoolConfig.getWaitTime and re-get
        try {
            Thread.sleep(sessionPoolConfig.getWaitTime());
        } catch (InterruptedException e) {
            log.error("getSession error when wait for idle sessions, ", e);
            throw new RuntimeException(e);
        }
    }

    // if session size is equal to max size and no idle session here, throw exception
    throw new RuntimeException("no extra session available");
}
  • 可以调整从sessionList中随机取出nebulaSession
/**
 * return an idle session
 */
private synchronized NebulaSession getSession() throws ClientServerIncompatibleException,
        AuthFailedException, IOErrorException, BindSpaceFailedException {
    int retry = sessionPoolConfig.getRetryConnectTimes();
    while (retry-- >= 0) {
        // if there are idle sessions, get session from queue
        if (idleSessionSize.get() > 0) {
            int[] randomInts = RandomUtil.randomInts(sessionList.size());
            for (int randomInt : randomInts) {
                NebulaSession nebulaSession = sessionList.get(randomInt);
                if (nebulaSession.isIdleAndSetUsed()) {
                    int currentIdleSessionSize = idleSessionSize.decrementAndGet();
                    log.debug("ng session {} {}", currentIdleSessionSize, nebulaSession.getSessionID());
                    return nebulaSession;
                }
            }
        }
        // if session size is less than max size, get session from pool
        if (sessionList.size() < maxSessionSize) {
            return createSessionObject(SessionState.USED);
        }
        // there's no available session, wait for SessionPoolConfig.getWaitTime and re-get
        try {
            Thread.sleep(sessionPoolConfig.getWaitTime());
        } catch (InterruptedException e) {
            log.error("getSession error when wait for idle sessions, ", e);
            throw new RuntimeException(e);
        }
    }

    // if session size is equal to max size and no idle session here, throw exception
    throw new RuntimeException("no extra session available");
}

http://www.kler.cn/a/579540.html

相关文章:

  • 【数据结构与算法】Java描述:第二节:LinkedList 链表
  • 【YOLOv12改进trick】三重注意力TripletAttention引入YOLOv12中,实现遮挡目标检测涨点,含创新点Python代码,方便发论文
  • OSPF报文分析
  • MySQL环境搭建和基本操作
  • 【大模型】WPS 接入 DeepSeek-R1详解,打造全能AI办公助手
  • vivado 充分利用 IP 核
  • 实时数据驱动的RAG应用
  • Python如何制作并查询sql数据库
  • 【初阶数据结构】二叉树的链式结构
  • 面试基础--Redis 缓存穿透、缓存击穿、缓存雪崩深度解析
  • LLM论文笔记 17: Program of Thoughts Prompting (PoT)
  • 在 Ubuntu 20.04 上交叉编译 Qt 5 应用,使其可在 Windows 运行
  • Elasticsearch如何删除字段
  • Linux系统基于ARM平台的LVGL移植
  • clickhouse 频繁刷新
  • 算法与数据结构(最长回文子串)
  • PTA L2一些题目
  • 学习网络安全需要哪些基础?
  • ubuntu直接安装mobaxterm
  • 大模型最新面试题系列:训练篇之模型监控与调试