spark的stage划分的原理
在 Apache Spark 中,stage
是执行作业时的重要执行单元。一个 Spark 作业会被划分为若干个 stage
,每个 stage
由一组可以并行执行的任务组成。这种划分主要依赖于 RDD 中的操作类型(窄依赖和宽依赖)。下面我们来讨论 Spark stage
的创建和划分的原理以及代码实现的核心逻辑。
Spark Stage 划分的原理
-
RDD 依赖(窄依赖和宽依赖):
- Spark 中,RDD 可以有两种依赖关系:
- 窄依赖(narrow dependency):父 RDD 的每个分区至多被子 RDD 的一个分区使用,典型的操作如
map
、filter
等。 - 宽依赖(wide dependency):父 RDD 的每个分区可能被多个子 RDD 的分区使用,典型的操作如
reduceByKey
、groupByKey
等,这类操作会触发shuffle
。
- 窄依赖(narrow dependency):父 RDD 的每个分区至多被子 RDD 的一个分区使用,典型的操作如
- 窄依赖的 RDD 操作可以被划分到同一个
stage
中,而宽依赖的 RDD 操作会触发shuffle
,导致stage
划分。
- Spark 中,RDD 可以有两种依赖关系:
-
DAG(有向无环图):
Spark 的作业会构建一个 RDD 的依赖图(DAG)。这个 DAG 中每个 RDD 的窄依赖操作会被合并成一个stage
,宽依赖操作会划分出不同的stage
,并在两个stage
之间插入shuffle
。 -
Stage
划分规则:- 每当遇到一个宽依赖(如
reduceByKey
、join
、groupByKey
等),Spark 会创建一个新的stage
,并将之前的 RDD 操作划分到一个stage
中,形成一个有序的stage
执行链。 stage
划分的核心任务是:将窄依赖操作尽可能合并到一起,直到遇到需要shuffle
的宽依赖操作。
- 每当遇到一个宽依赖(如
Spark Stage
划分的核心代码逻辑
Spark 的 DAG 划分及 stage
划分主要在 DAGScheduler
中实现。DAGScheduler
是 Spark 作业调度的核心组件,负责将逻辑作业(job)划分为多个 stage
,并调度这些 stage
执行。
以下是 Spark 3.x 版本中有关 stage
划分的核心逻辑及其简化代码片段。
1. DAGScheduler 类
DAGScheduler
类位于 org.apache.spark.scheduler
包下,它负责管理 RDD 依赖关系并创建 stage
。DAGScheduler
会根据 RDD 的依赖图和操作类型,生成任务的 DAG 并划分 stage
。
class DAGScheduler(
// 参数略...
) extends Logging {
// stage 列表
private val stages = new HashMap[StageId, Stage]()
// 提交 Job 时触发的函数
def submitJob(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, _) => Unit,
properties: Properties = null): JobWaiter[_] = {
// 根据 RDD 和依赖关系生成最终的 ResultStage
val finalStage = createFinalStage(rdd, partitions, callSite)
// 提交该 stage 执行
submitStage(finalStage)
}
// 创建 ResultStage 和后续的 Stage
private def createFinalStage(
rdd: RDD[_],
partitions: Seq[Int],
callSite: CallSite): ResultStage = {
// 创建该作业的最终的 stage,并递归创建所有依赖的 stage
val finalStage = newStage(rdd, partitions)
finalStage
}
// 递归生成各个 Stage,核心逻辑
private def newStage(rdd: RDD[_], partitions: Seq[Int]): Stage = {
// 检查缓存,避免重复生成 Stage
stages.getOrElseUpdate(rdd.id, {
val shuffleDeps = getShuffleDependencies(rdd)
// 如果存在宽依赖,则要划分为不同的 stage
if (shuffleDeps.nonEmpty) {
val parentStages = shuffleDeps.map { dep =>
newStage(dep.rdd, dep.rdd.partitions.indices)
}
val newStage = new ShuffleMapStage(rdd, parentStages)
stages(newStage.id) = newStage
newStage
} else {
// 如果只有窄依赖,当前操作在同一个 stage 内
val parentStages = getNarrowDependencies(rdd).map { dep =>
newStage(dep.rdd, dep.rdd.partitions.indices)
}
val newStage = new ResultStage(rdd, parentStages)
stages(newStage.id) = newStage
newStage
}
})
}
// 获取 RDD 的 shuffle 依赖(宽依赖)
private def getShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = {
rdd.dependencies.collect {
case shuffleDep: ShuffleDependency[_, _, _] => shuffleDep
}
}
// 获取 RDD 的窄依赖
private def getNarrowDependencies(rdd: RDD[_]): List[Dependency[_]] = {
rdd.dependencies.collect {
case narrowDep: NarrowDependency[_] => narrowDep
}
}
}
2. Stage 划分的基本过程
-
RDD 依赖遍历:通过
newStage
函数递归遍历 RDD 的依赖关系,将遇到的每一个shuffle
依赖(宽依赖)创建一个新的ShuffleMapStage
,而ResultStage
则用于最终计算结果。 -
宽依赖处理:当遇到宽依赖(
ShuffleDependency
),说明需要进行shuffle
,因此要创建一个新的stage
。 -
窄依赖处理:当只有窄依赖时,RDD 可以继续合并在当前的
stage
中。
3. ShuffleMapStage 和 ResultStage
ShuffleMapStage
和 ResultStage
是 Spark 中两种类型的 Stage
:
ShuffleMapStage
:处理宽依赖(shuffle
),该stage
会产生shuffle
文件供下游stage
使用。ResultStage
:最终计算Action
(如collect
、saveAsTextFile
等)结果的stage
,是 DAG 中的最后一个stage
。
代码流程总结
DAGScheduler
在收到作业时,会从最后的Action
开始,通过递归函数newStage
,根据 RDD 的依赖关系逐步向上遍历。- 当遇到
shuffle
依赖时,会将其划分为不同的stage
,每个shuffle
依赖会产生一个ShuffleMapStage
。 - 所有的窄依赖 RDD 操作则合并为一个
stage
,在同一个stage
中执行。 submitStage
负责将划分好的stage
发送给 TaskScheduler,TaskScheduler 则进一步调度任务到集群执行。
总结
- 窄依赖操作:操作在同一个
stage
中执行,尽可能合并,减少shuffle
。 - 宽依赖操作:每个宽依赖会触发新的
stage
,并引入shuffle
,每个shuffle
会将数据重新分布给后续的stage
。 DAGScheduler
的作用:DAG 调度器负责将 RDD 操作链划分为多个stage
,并根据依赖关系生成一个 DAG。