当前位置: 首页 > article >正文

StarRocks 中如何做到查询超时(QueryTimeout)

背景

本文基于 StarRocks 3.1.7
主要是分析以下两种超时设置的方式:

  • SESSION 级别
    SET query_timeout = 10;
    SELECT sleep(20);
  • SQL 级别
  select /*+ SET_VAR(query_timeout=10) */ sleep(20); 

通过本文的分析大致可以了解到在Starrocks的FE端是如何进行Command的交互以及数据流走向,其他的命令也是可以举一反三

分析

query_timeout 命令解析

和Spark以及hive等但是解析一样,StarRocks也是采用的Anltr4进行语法的解析,
对于StarRocks来说, 对应的语法解析文件为 StarRocks.g4文件,那么其set query_time在如下的位置

setStatement
    : SET setVar (',' setVar)*
    ;

setVar
    : (CHAR SET | CHARSET | CHARACTER SET) (identifierOrString | DEFAULT)                       #setNames
    | NAMES (charset = identifierOrString | DEFAULT)
        (COLLATE (collate = identifierOrString | DEFAULT))?                                     #setNames
    | PASSWORD '=' (string | PASSWORD '(' string ')')                                           #setPassword
    | PASSWORD FOR user '=' (string | PASSWORD '(' string ')')                                  #setPassword
    | userVariable '=' expression                                                               #setUserVar
    | varType? identifier '=' setExprOrDefault                                                  #setSystemVar
    | systemVariable '=' setExprOrDefault                                                       #setSystemVar
    | varType? TRANSACTION transaction_characteristics                                          #setTransaction
    ;

继而可以找到对应的语法解析部分为 AstBuilder.java 中

 @Override
    public ParseNode visitSetSystemVar(StarRocksParser.SetSystemVarContext context) {
        NodePosition pos = createPos(context);
        if (context.systemVariable() != null) {
            VariableExpr variableDesc = (VariableExpr) visit(context.systemVariable());
            Expr expr = (Expr) visit(context.setExprOrDefault());
            return new SystemVariable(variableDesc.getSetType(), variableDesc.getName(), expr, pos);
        } else {
            Expr expr = (Expr) visit(context.setExprOrDefault());
            String variable = ((Identifier) visit(context.identifier())).getValue();
            if (context.varType() != null) {
                return new SystemVariable(getVariableType(context.varType()), variable, expr, pos);
            } else {
                return new SystemVariable(SetType.SESSION, variable, expr, pos);
            }
        }
    }

从以上所示,SET query_timeout = 10; 就会在语法层面解析为 new SystemVariable(SetType.SESSION, variable, expr, pos)

数据流向

以上只是说到了 SET query_timeout = 10 只会被解析为SystemVariable对应的java数据结构,但是一条SQL从客户端发送过来,是怎么一个数据流呢?
我们大概的捋一下:

StarRocksFE中新建QeService对象
   ||
   \/
 new NMysqlServer(port, scheduler, sslContext)
   ||
   \/
 new AcceptListener(connectScheduler, sslContext)
   ||
   \/
 AcceptListener.handleEvent
   ||
   \/
 context.startAcceptQuery(processor)
   ||
   \/
 NMysqlChannel.startAcceptQuery
   ||
   \/
 conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor))
   ||
   \/
 ReadListener.handleEvent
   ||
   \/
 connectProcessor.processOnce()
   ||
   \/
 connectProcessor.dispatch
   ||
   \/
 connectProcessor.handleQuery
   ||
   \/
 stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());
   ||
   \/
 StmtExecutor.execute()
   ||
   \/
 StatementPlanner.plan(parsedStmt, context)
   ||
   \/
 StmtExecutor.handleSetStmt()
   ||
   \/
 SetExecutor.execute // 会设置到变量的keyValue到`ConnectContext`的`SystemVariable`变量中,后续会或获取对应的值

query_timeout 怎么生效

还是定位到StarRocksFE.java中:

ExecuteEnv.setup();

该方法是整个执行环境的基础设置。其中会有ConnectScheduler的初始化:

public ConnectScheduler(int maxConnections) {
        this.maxConnections = new AtomicInteger(maxConnections);
        numberConnection = new AtomicInteger(0);
        nextConnectionId = new AtomicInteger(0);
        // Use a thread to check whether connection is timeout. Because
        // 1. If use a scheduler, the task maybe a huge number when query is messy.
        //    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.
        // 2. Use a thread to poll maybe lose some accurate, but is enough to us.
        ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,
                "Connect-Scheduler-Check-Timer", true);
        checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);
    }

这里有个定时线程池去进行timeout的检查,间隔是一秒。具体的检查机制在TimeoutChecker类中:

private class TimeoutChecker extends TimerTask {
        @Override
        public void run() {
            try {
                long now = System.currentTimeMillis();
                synchronized (ConnectScheduler.this) {
                    //Because unregisterConnection will be callback in NMysqlChannel's close,
                    //unregisterConnection will remove connectionMap (in the same thread)
                    //This will result in a concurrentModifyException.
                    //So here we copied the connectionIds to avoid removing iterator during operate iterator
                    ArrayList<Long> connectionIds = new ArrayList<>(connectionMap.keySet());
                    for (Long connectId : connectionIds) {
                        ConnectContext connectContext = connectionMap.get(connectId);
                        connectContext.checkTimeout(now);
                    }
                }
            } catch (Throwable e) {
                //Catch Exception to avoid thread exit
                LOG.warn("Timeout checker exception, Internal error : " + e.getMessage());
            }
        }
    }

主要逻辑就是从connectionMap中获取对应的ConnectContext,从而调用ConnectContext.checkTimeout方法检查是否超时。
checkTimeout方法主要是通过sessionVariable.getQueryTimeoutS()获取设置的超时时间,如果超时,则调用StmtExecutor.cancel,继而调用Coordinator.cancel
所以现在就存在一个问题: 当前连接的ConnectContext什么时候被集成到 connectionMap中去的?
还是回到流程 AcceptListener.handleEvent 中去:

    connectScheduler.submit(context);
    ...
    if (connectScheduler.registerConnection(context)) {
            MysqlProto.sendResponsePacket(context);
            connection.setCloseListener(
                    streamConnection -> connectScheduler.unregisterConnection(context));
    } else {
    ...

这里的submit 方法会生成context的conectionId.
registerConnection方法会把当前 ConnectionContext 的id和 ConnectionContext 组成KeyValue对并放置到connectionMap

至此 SET query_timeout = 10 整体的数据流就结束了,待在同一个连接中进行select 操作的时候,就会根据执行的长短进行超时处理了。

注意:
对于 select /*+ SET_VAR(query_timeout=10) */ sleep(20); 这种情况的解析,是通过 HintCollector来解析的。
词法解析是在StarRocksLex.g4 中,

OPTIMIZER_HINT
    : '/*+' .*? '*/' -> channel(2)
    ;

对于获取hint是通过HintCollectorextractHintToRight获取的:

 private void extractHintToRight(ParserRuleContext ctx) {
        Token semi = ctx.start;
        int i = semi.getTokenIndex();
        List<Token> hintTokens = tokenStream.getHiddenTokensToRight(i, HINT_CHANNEL);
        if (hintTokens != null) {
            contextWithTokenMap.computeIfAbsent(ctx, e -> new ArrayList<>()).addAll(hintTokens);
        }
    }

对应的解析在:SqlParser.parseWithStarRocksDialect 方法中:

  HintCollector collector = new HintCollector((CommonTokenStream) parser.getTokenStream());
            collector.collect(singleStatementContexts.get(idx));

  AstBuilder astBuilder = new AstBuilder(sessionVariable.getSqlMode(), collector.getContextWithHintMap());

AstBuilder 中会存储 hint到 hintMap 变量中,而在 visitQuerySpecification方法中

        selectList.setOptHints(extractVarHints(hintMap.get(context)));

从而在StmtExecutor.execute中会调用 optHints = selectRelation.getSelectList().getOptHints();获取对应的hint,

 if (isQuery &&
          ((QueryStatement) parsedStmt).getQueryRelation() instanceof SelectRelation) {
      SelectRelation selectRelation = (SelectRelation) ((QueryStatement) parsedStmt).getQueryRelation();
      optHints = selectRelation.getSelectList().getOptHints();
  }
  if (optHints != null) {
      LOG.error("optHints: parsedStmt:" + parsedStmt.getOrigStmt() +"  "+ optHints.size());
      });
      SessionVariable sessionVariable = (SessionVariable) sessionVariableBackup.clone();
      for (String key : optHints.keySet()) {
          VariableMgr.setSystemVariable(sessionVariable,
                  new SystemVariable(key, new StringLiteral(optHints.get(key))), true);
      }
      context.setSessionVariable(sessionVariable);

这样 hint相关的变量就设置到ConnectContextSessionVariable中了,后续的流程和之前的一致。


http://www.kler.cn/a/328988.html

相关文章:

  • centos7执行yum操作时报错Could not retrieve mirrorlist http://mirrorlist.centos.org解决
  • 单调栈详解
  • 简洁实用的wordpress外贸模板
  • c++常见设计模式之装饰器模式
  • 计算机视觉——Intel RealSense D435的使用及python环境下的实现
  • 什么是HTTP3?
  • STM32器件支持包安装,STLINK/JLINK驱动安装
  • matlab r2024a、matlab R2024b保姆级安装教程
  • 滚雪球学MySQL[1.3讲]:MySQL客户端工具:详解与实践
  • 0基础跟德姆(dom)一起学AI 机器学习02-KNN算法
  • leetcode热题100.最长公共子序列
  • OpenFeign微服务部署
  • 自动驾驶系列—自动驾驶MCU架构全方位解析:从单核到多核的选型指南与应用实例
  • 前端——测试与打包时静态资源引用路径
  • Docker学习和部署ry项目
  • 磁编码器磁铁要求和安装要求
  • 安装pymssql
  • 简单两步,Spring Boot 定时任务也能动态设置
  • Springboot3保存日志到数据库
  • 力扣题解 983
  • MySQL-联合查询
  • PHP哪种加密扩展可以生成和验证数字签名
  • CSS中字体图标的使用
  • 长效ip的特征除了稳定还有什么
  • HTTP 和 HTTPS 协议的区别?
  • JavaWeb 12.Tomcat10