当前位置: 首页 > article >正文

Spark 中所有用到了Job对象的组件模块和关系

        在 Apache Spark 中,Job 是调度和执行的核心对象之一,它代表了一次具体的数据计算操作。Job 不是独立存在的,而是与 Spark 内部多个组件、类和功能紧密集成。为了全面了解与 Job 相关的所有类、组件和功能,并解释它们之间的关系,我们需要从整个 Spark 的执行流程、架构层次和源代码入手。

1. Job 的概念与架构

        在 Spark 中,每个 Job 由用户操作触发,通常对应于一次 Action 操作(如 count()collect())。这些 Action 操作会让 Spark 将一系列的 RDD 转换操作(transformation,如 map()filter())封装成一个 Job,并通过调度系统执行。

        Job 是 Spark 的逻辑计算单元,但它的执行依赖于底层调度机制和任务划分。在底层,Job 被分解为多个 Stage,每个 Stage 包含一系列可并行执行的 Task。所有这些组件通过不同的类和模块协调工作。

2. Job 相关的核心类和组件

Job 涉及的主要类和组件包括以下几个重要部分:

  1. DAGSchedulerJob 的生成与调度管理器。
  2. Stage:将 Job 细化为可调度的执行阶段。
  3. Task 和 TaskSet:进一步将 Stage 分解为具体的计算任务,映射到集群上的不同节点执行。
  4. ActiveJob:表示运行时 Job 的封装,包含 Job 的执行状态、监听器等。
  5. TaskScheduler:任务的具体调度器,将 TaskSet 提交给执行器(executor)执行。
  6. MapOutputTracker:跟踪 Shuffle 数据的输出位置,用于 Stage 之间的依赖解析。
  7. SchedulerBackend:负责与集群管理器(如 YARN、Mesos、Kubernetes)交互,资源分配、任务调度。

下面逐一详细解释这些类及其与 Job 的关系。

3. DAGScheduler:作业调度器

        DAGScheduler 是 Spark 的核心调度器,它的主要职责是将用户提交的高层次操作(如 transformations 和 actions)解析为 JobStage 和 TaskDAGScheduler 处理了所有与 Job 相关的依赖、划分和调度工作。

  • Job 的创建:当用户调用 Action 时,DAGScheduler 会创建一个新的 Job
  • Stage 的划分DAGScheduler 负责将 DAG(有向无环图)划分为多个 Stage,每个 Stage 可以看作是一组可以并行执行的任务。宽依赖(如 shuffle 操作)会将 DAG 分割成不同的 Stage
  • Task 的生成与提交:每个 Stage 被划分为多个 TaskTask 会被 TaskScheduler 提交给具体的集群节点执行。
// DAGScheduler.scala (简化)
class DAGScheduler(
  taskScheduler: TaskScheduler,
  mapOutputTracker: MapOutputTracker,
  ...) {

  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      ...) : JobWaiter[U] = {
    
    val jobId = nextJobId.getAndIncrement()
    val finalStage = createResultStage(rdd, func, partitions, jobId)
    val job = new Job(jobId, finalStage, ...)
    
    jobIdToActiveJob(jobId) = new ActiveJob(job, finalStage)
    submitStage(finalStage)
    
    return job.waiter
  }

  private def submitStage(stage: Stage): Unit = {
    if (stage.tasks == null) {
      stage.tasks = createShuffleMapTasks(stage)
    }
    taskScheduler.submitTasks(new TaskSet(stage.tasks, stage.id))
  }
}
  • Job 的调度与执行DAGScheduler 负责提交每个 Stage,并通过 TaskScheduler 调度任务的执行。每个 Stage 的任务执行完成后,如果该阶段是 Job 的最后阶段,则 Job 完成。

4. Stage:执行阶段

Stage 是 Spark 中 Job 的子单元,代表一组可以并行执行的任务。DAGScheduler 会将整个 Job 划分为多个 Stage。每个 Stage 通常对应于一组 RDD 的窄依赖操作。宽依赖操作(如 reduceByKeygroupBy)会引入新的 Stage

// Stage.scala
private[scheduler] class Stage(
  val id: Int,
  val rdd: RDD[_],
  val numTasks: Int,
  val parents: List[Stage],
  val jobId: Int) {

  var tasks: Array[Task[_]] = _

  def addParent(stage: Stage): Unit = {
    parents :+= stage
  }
}
  • 父子关系:每个 Stage 可以有多个父 Stage,表示依赖关系。Stage 之间通过 RDD 的依赖关系建立。

5. Task 和 TaskSet

Task 是 Spark 中的最小计算单元。每个 Stage 被细化为多个 Task,每个 Task 通常对应于 RDD 的一个分区。TaskSet 是一组可并行执行的 Task,它们会被提交给集群中的执行器执行。

// Task.scala
abstract class Task[T](val taskId: Int, val stageId: Int) extends Serializable {
  def run(taskContext: TaskContext): T
}

// TaskSetManager.scala
class TaskSetManager(taskSet: TaskSet, ...) {
  def resourceOffer(execId: String, host: String): Option[TaskDescription] = {
    val task = findTaskToRun(execId, host)
    Option(new TaskDescription(task))
  }
}
  • TaskSet 的提交TaskSet 会由 DAGScheduler 通过 TaskScheduler 提交,并在集群中执行。

6. TaskScheduler:任务调度器

TaskScheduler 负责具体的任务调度,它接收来自 DAGScheduler 的 TaskSet,并决定将这些任务分配到集群中的哪个节点执行。TaskScheduler 将任务提交给 SchedulerBackend,并通过集群管理器(如 YARN、Kubernetes)来启动和管理执行器。

// TaskSchedulerImpl.scala
class TaskSchedulerImpl(backend: SchedulerBackend, ...) extends TaskScheduler {
  override def submitTasks(taskSet: TaskSet): Unit = {
    val manager = new TaskSetManager(taskSet, maxTaskFailures)
    taskSetsByStageIdAndAttempt(taskSet.stageId)(taskSet.stageAttemptId) = manager
    backend.reviveOffers()
  }
}
  • 任务调度与执行器交互TaskScheduler 会使用 SchedulerBackend 与集群管理器进行通信,将任务分配给具体的执行器。

7. SchedulerBackend:与集群管理器的交互

SchedulerBackend 是 Spark 与集群管理器(如 YARN、Mesos、Kubernetes)之间的接口。它负责申请资源并将任务分发给集群中的执行器。

// CoarseGrainedSchedulerBackend.scala
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, ...) extends SchedulerBackend {
  override def start() {
    // 启动执行器
    super.start()
  }

  def launchTasks(taskDescriptions: Seq[TaskDescription]) {
    for (desc <- taskDescriptions) {
      executor.launchTask(desc)
    }
  }
}
  • 任务分配与资源管理SchedulerBackend 启动执行器,并在执行器上启动任务。

8. ActiveJob:运行时 Job 封装

ActiveJob 代表一个正在运行的 Job,它封装了 Job 的执行状态、最终结果以及监听器等。DAGScheduler 通过 ActiveJob 追踪 Job 的执行进度。

// ActiveJob.scala
private[scheduler] class ActiveJob(
  val jobId: Int,
  val finalStage: Stage,
  val listener: JobListener) {
    
  def jobFinished(): Unit = {
    listener.jobSucceeded(this)
  }
}
  • ActiveJob 的作用:当 Job 完成时,ActiveJob 会通知 JobListener,并进行相应的状态更新。

9. MapOutputTracker:Shuffle 依赖的输出跟踪

MapOutputTracker 负责跟踪 Shuffle 操作的输出位置。它是 Spark 在 Stage 之间执行数据交换的重要组件,尤其是在处理宽依赖时,用于决定下游 Stage 应从哪个节点获取数据。

// MapOutputTracker.scala
class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) {
  def registerMapOutput(
    shuffleId: Int, mapId: Int, status: MapStatus): Unit = {
    shuffleStatuses(shuffleId).addMapStatus(mapId, status)
  }
}
  • Shuffle 依赖解析MapOutputTracker 在 Stage 之间的数据传输过程中,决定数据的获取方式。

10. JobListener:监听 Job 的完成

JobListener 是用于监听 Job 执行状态的接口,当 Job 成功或失败时,JobListener 会通知相关组件并进行状态更新。

// JobListener.scala
trait JobListener {
  def jobSucceeded(job: ActiveJob): Unit
  def jobFailed(job: ActiveJob, exception: Exception): Unit
}

11. 组件之间的关系总结

  • DAGScheduler:负责整个 Job 的生命周期管理,包括阶段划分、任务调度、任务失败重试等。
  • StageJob 被分解为多个 Stage,每个 Stage 对应于一组可并行执行的 Task
  • Task 和 TaskSetStage 被分解为多个 Task,并通过 TaskSet 提交执行。
  • TaskScheduler 和 SchedulerBackendTaskScheduler 负责将任务调度到集群中具体的节点,SchedulerBackend 负责与集群管理器交互,启动执行器并分配任务。
  • ActiveJob:代表正在运行的 Job,并与 JobListener 一起跟踪其状态。
  • MapOutputTracker:负责 Shuffle 操作中,数据的输出位置跟踪与管理。

总结

  • Job 是 Spark 中的核心执行单元,代表一次具体的计算操作。Job 由用户提交的 Action 操作触发,并通过 DAGScheduler 进行调度。
  • DAGScheduler 将 Job 分解为多个 Stage,每个 Stage 进一步被分解为多个 Task,这些任务通过 TaskScheduler 调度到集群的执行器上执行。
  • 各组件之间的协作 确保了 Spark 中大规模并行计算的高效执行。

http://www.kler.cn/a/330580.html

相关文章:

  • moviepy 将mp4视频文件提取音频mp3 - python 实现
  • Java Spring Boot实现基于URL + IP访问频率限制
  • C++ 如何将 gRPC集成到机器人系统中
  • css出现边框
  • 基于单片机的数字气压计设计
  • Python中的可变对象与不可变对象;Python中的六大标准数据类型哪些属于可变对象,哪些属于不可变对象
  • windows10或11家庭版实现远程桌面连接控制
  • 【GO语言】卡尔曼滤波例程
  • MySQL 实验 2:数据库的创建与管理
  • 管理方法(12)-- 采购管理
  • Elasticsearch 实战应用:从入门到项目集成
  • [2024年]最新VMware Workstation虚拟机下载 带链接
  • 基于微信的乐室预约小程序+ssm(lw+演示+源码+运行)
  • 根据给定的相机和镜头参数,估算相机的内参。
  • Linux 性能调优技巧
  • Java项目实战II基于Java+Spring Boot+MySQL的美发门店管理系统(源码+数据库+文档)
  • 探索Elastic Search:强大的开源搜索引擎,详解及使用
  • 听说这是MATLAB基础?
  • React 有哪些 Hooks
  • RabbitMQ基本原理
  • 算法闭关修炼百题计划(一)
  • FreeRTOS(四)FreeRTOS列表与列表项
  • 自定义 CSS 和 t-att-class 的使用
  • 机器学习3--numpy
  • rabbitMq------连接管理模块
  • 【重学 MySQL】五十三、MySQL数据类型概述和字符集设置