当前位置: 首页 > article >正文

一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

文章目录

      • 你可以学到啥
      • 测试代码
      • 背景知识
      • SQL转变流程图
      • 问题

你可以学到啥

  • SQL如何一步步变成执行计划的
  • 有哪些优化器,哪些优化规则
  • calcite 和flink 如何结合的

测试代码

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();
        Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();
        tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema)
                .option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());

        tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1)
                .option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());

        String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);
        System.out.println(cost);

背景知识

需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识

SQL转变流程图

SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。
在这里插入图片描述

问题

  • 问题1,FlinkBatchProgram
    所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {
  val SUBQUERY_REWRITE = "subquery_rewrite"
  val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"
  val DECORRELATE = "decorrelate"
  val DEFAULT_REWRITE = "default_rewrite"
  val PREDICATE_PUSHDOWN = "predicate_pushdown"
  val JOIN_REORDER = "join_reorder"
  val JOIN_REWRITE = "join_rewrite"
  val PROJECT_REWRITE = "project_rewrite"
  val WINDOW = "window"
  val LOGICAL = "logical"
  val LOGICAL_REWRITE = "logical_rewrite"
  val TIME_INDICATOR = "time_indicator"
  val PHYSICAL = "physical"
  val PHYSICAL_REWRITE = "physical_rewrite"
  val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"
  val RUNTIME_FILTER = "runtime_filter
  }
  • 问题2,calcite 优化器和flink 如何结合的
    logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
    剩下的优化器HepPlanner,HepPlanner 完全使用规则。

  • 问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
    因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的

  • 问题4,物理计划如何生成执行代码的?
    BatchPhysicalTableSourceScan 类

class BatchPhysicalTableSourceScan(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    hints: util.List[RelHint],
    tableSourceTable: TableSourceTable)
  extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)
  with BatchPhysicalRel {

  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
    val rowCnt = mq.getRowCount(this)
    if (rowCnt == null) {
      return null
    }
    val cpu = 0
    val rowSize = mq.getAverageRowSize(this)
    val size = rowCnt * rowSize
    planner.getCostFactory.makeCost(rowCnt, cpu, size)
  }

  // 这里生成的执行代码
  override def translateToExecNode(): ExecNode[_] = {
    val tableSourceSpec = new DynamicTableSourceSpec(
      tableSourceTable.contextResolvedTable,
      util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
    tableSourceSpec.setTableSource(tableSourceTable.tableSource)

    new BatchExecTableSourceScan(
      unwrapTableConfig(this),
      tableSourceSpec,
      FlinkTypeFactory.toLogicalRowType(getRowType),
      getRelDetailedDescription)
  }
}
  • 问题5,为啥aa_user 表被广播,哪里实现的?

BatchPhysicalHashJoinRule 规则实现的

核心代码

 val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)
      val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)

      // if it is not with hint, just check size of left and right side by statistic and config
      // if leftSize or rightSize is unknown, cannot use broadcast
      if (leftSize == null || rightSize == null) {
        return (false, false)
      }

      val threshold =
        tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)

      val rightSizeSmallerThanThreshold = rightSize <= threshold
      val leftSizeSmallerThanThreshold = leftSize <= threshold
      val leftSmallerThanRight = leftSize < rightSize

      join.getJoinType match {
        case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)
        case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)
        case JoinRelType.FULL => (false, false)
        case JoinRelType.INNER =>
          (
            leftSizeSmallerThanThreshold
              || rightSizeSmallerThanThreshold,
            leftSmallerThanRight)
        // left side cannot be used as build side in SEMI/ANTI join.
        case JoinRelType.SEMI | JoinRelType.ANTI =>
          (rightSizeSmallerThanThreshold, false)
      }

主要就是实现

  def binaryRowRelNodeSize(relNode: RelNode): JDouble = {
    val mq = relNode.getCluster.getMetadataQuery
    val rowCount = mq.getRowCount(relNode)
    if (rowCount == null) {
      null
    } else {
      rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)
    }
  }

最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类

class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {

  override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF

  /**
   * Gets the null count of the given column in TableScan.
   *
   * @param ts
   *   TableScan RelNode
   * @param mq
   *   RelMetadataQuery instance
   * @param index
   *   the index of the given column
   * @return
   *   the null count of the given column in TableScan
   */
  def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {
    Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])
    val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]
    val fieldNames = relOptTable.getRowType.getFieldNames
    Preconditions.checkArgument(index >= 0 && index < fieldNames.size())
    val fieldName = fieldNames.get(index)
    val statistic = relOptTable.getStatistic
    val colStats = statistic.getColumnStats(fieldName)
    if (colStats != null && colStats.getNullCount != null) {
      colStats.getNullCount.toDouble
    } else {
      null
    }
  }
  }

ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数

 private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {
        final RelOptTable scanTable = scan.getTable();
        if (!(scanTable instanceof TableSourceTable)) {
            return scan;
        }

        FlinkContext context = ShortcutUtils.unwrapContext(scan);
        TableSourceTable table = (TableSourceTable) scanTable;
        boolean reportStatEnabled =
                context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)
                        && table.tableSource() instanceof SupportsStatisticReport;

        SourceAbilitySpec[] specs = table.abilitySpecs();
        PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);

        FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);
        TableStats newTableStat =
                recomputeStatistics(
                        table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);
        FlinkStatistic newStatistic =
                FlinkStatistic.builder()
                        .statistic(table.getStatistic())
                        .tableStats(newTableStat)
                        .build();
        TableSourceTable newTable = table.copy(newStatistic);
        return new LogicalTableScan(
                scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);
    }

http://www.kler.cn/news/306196.html

相关文章:

  • Kubernetes Pod镜像的3种状态
  • STM32-UART配置注释
  • 标准库标头 <bit>(C++20)学习
  • 计算机网络 --- 计算机网络性能【七大性能指标】
  • 如何精确统计Pytorch模型推理时间
  • c语言写的环形队列
  • emWin5的图片半透明之旅
  • 高级java每日一道面试题-2024年9月12日-架构篇[DDD领域驱动篇]-如何使用领域驱动设计(DDD)中的事务脚本模式?
  • Spring4-IoC2-基于注解管理bean
  • comfyui中,sam detector与yoloworld图像分割算法测试以及影响
  • [极客大挑战 2019]PHP
  • 1、常用的数据库、表操作
  • 蒸!--数据在内存中的存储
  • node express 开启多进程
  • python多线程程序设计 之二
  • C#获取计算机信息
  • C++入门基础知识68(高级)——【关于C++ 异常处理】
  • 【系统架构设计师-2010年真题】案例分析-答案及详解
  • Superset二次开发之源码asyncEvent.ts 分析
  • 嵌入式C语言自我修养:C语言的面向对象编程思想
  • 问题 H: 三角数
  • 【在Linux世界中追寻伟大的One Piece】五种IO模型和阻塞IO
  • 13. 神经网络基本骨架--nn.Module
  • 长业务事务的离线并发问题
  • 9. 什么是 Beam Search?深入理解模型生成策略
  • leetcode 难度【简单模式】标签【数据库】题型整理大全
  • 【网络安全的神秘世界】渗透测试基础
  • 【C#】添加临时环境变量
  • linux第二课(docker的安装使用)
  • 微软九月补丁星期二发现了 79 个漏洞