SparkSQL的执行过程:从源码角度解析逻辑计划、优化计划和物理计划
SparkSQL的执行过程可以分为以下几个阶段:从用户的SQL语句到最终生成的RDD执行,涵盖逻辑计划、优化计划和物理计划。以下是详细的源码角度解析:
1. 解析阶段(Parsing)
- SQL语句解析:Spark 使用
Catalyst
引擎将用户输入的 SQL 语句解析为 抽象语法树(AST, Abstract Syntax Tree)。 - 代码位置:
org.apache.spark.sql.catalyst.parser.SqlBase.g4
定义了语法规则,SqlParser
使用 ANTLR 工具解析 SQL。 - 输出结果:解析后的
LogicalPlan
,表示 SQL 的初始逻辑计划。
2. 分析阶段(Analysis)
- 任务:通过元数据和表的 Schema 校验逻辑计划中的字段、函数等,并为计划补充缺失信息。
- 关键组件:
- Catalog:Spark 用 Catalog 管理表的元数据。
- Analyzer:负责逻辑计划的语义分析。
- 规则应用:
Analyzer
通过一系列规则(rules)完成字段校验、类型推断。
- 代码位置:
org.apache.spark.sql.catalyst.analysis.Analyzer
。
- 输出结果:生成一个经过校验和补全的逻辑计划,称为 Analyzed Logical Plan。
3. 逻辑优化阶段(Logical Optimization)
- 任务:对逻辑计划进行规则化优化,比如谓词下推、列剪裁、常量折叠等。
- 关键组件:
- Optimizer:基于规则的优化器,应用各种优化规则。
- 典型优化规则:
- 谓词下推:将
Filter
操作下推到最靠近数据源的位置。 - 列剪裁:只保留查询所需的列。
- 常量折叠:将表达式中的常量计算提前。
- 谓词下推:将
- 代码位置:
org.apache.spark.sql.catalyst.optimizer.Optimizer
。
- 输出结果:一个经过优化的逻辑计划,称为 Optimized Logical Plan。
4. 物理计划生成阶段(Physical Planning)
- 任务:将逻辑计划转化为物理计划,选择最优执行方案。
- 关键组件:
- Planner:为逻辑操作选择物理操作的执行方式。
- 成本模型:基于代价估算,选择最佳的物理计划。例如:选择
SortMergeJoin
或BroadcastHashJoin
。
- 代码位置:
org.apache.spark.sql.execution.SparkPlanner
。org.apache.spark.sql.execution.strategy
包含了具体的物理计划生成策略。
- 输出结果:多个候选的物理计划,最终选定一个最优计划作为 Physical Plan。
5. 代码生成阶段(Code Generation)
- 任务:对物理计划中的部分操作生成更高效的 Java 字节码(bytecode)。
- 关键组件:
- WholeStageCodegen:SparkSQL 中的重要优化,能将多个操作结合为单一代码片段以减少任务调度的开销。
- 代码位置:
org.apache.spark.sql.execution.WholeStageCodegenExec
。
- 输出结果:带有代码生成(Codegen)信息的物理计划。
6. RDD生成阶段(Execution Preparation)
- 任务:将物理计划转化为低层次的 RDD 操作。
- 关键组件:
- 每个
Exec
物理节点都会实现doExecute
方法,负责生成对应的 RDD。 - 示例:
ScanExec
节点生成数据源的 RDD,ProjectExec
节点生成投影操作的 RDD。
- 每个
- 代码位置:
- 各种执行节点的实现位于
org.apache.spark.sql.execution
包中。
- 各种执行节点的实现位于
- 输出结果:Spark 的执行引擎中直接运行的 RDD DAG。
7. 执行阶段(Execution)
- 任务:提交作业并执行 RDD 转换。
- 过程:
- DAG 构建:根据 RDD 依赖构建执行 DAG。
- 任务调度:通过
TaskScheduler
提交任务到集群执行。
- 代码位置:
- RDD 转换由
org.apache.spark.rdd.RDD
的compute
方法完成。 - 调度部分由
DAGScheduler
和TaskScheduler
完成。
- RDD 转换由
总结执行流程示意图
-
解析阶段
SELECT name FROM students WHERE age > 18;
↓
-
初始逻辑计划
LogicalPlan(Filter(age > 18), Project(name))
↓
-
分析计划
LogicalPlan(Filter(students.age > 18), Project(students.name))
↓
-
优化计划
Filter(age > 18) -> Project(name) ↓ Pushed Filters -> Optimized LogicalPlan
-
物理计划
Scan Students RDD -> Apply Filters -> Project Columns
-
RDD 生成
studentsRDD.filter(age > 18).map(name => name)
通过以上步骤,SparkSQL 实现了从用户查询到集群执行的全过程,并通过 Catalyst 提供了高度灵活的优化和扩展能力。