Spark 任务与 Spark Streaming 任务的差异详解
Spark 任务与 Spark Streaming 任务的主要差异源自于两者的应用场景不同:Spark 主要处理静态的大数据集,而 Spark Streaming 处理的是实时流数据。这些差异体现在任务的调度、执行、容错、数据处理模式等方面。
接下来,我们将从底层原理和源代码的角度详细解析 Spark 任务和 Spark Streaming 任务的差别。
1. 任务调度模型差异
1.1 Spark 任务的调度模型
Spark 的任务调度基于 DAGScheduler 和 TaskScheduler 进行:
-
DAG 构建:在 Spark 中,每个作业会被构建成一个有向无环图(DAG)。DAG 的顶点代表不同的 RDD 转换操作,而边则表示 RDD 之间的依赖关系。Spark 的
DAGScheduler
根据 DAG 划分阶段(Stage),每个阶段会生成多个任务。 -
Task 的生成和分发:Spark 中,任务是由 RDD 的各个分区(Partition)构成的。每个分区都会对应生成一个 Task,Task 通过
TaskScheduler
被分发给不同的 Executor 节点执行。
Spark 任务调度流程
def submitJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit
): JobWaiter[U] = {
val jobId = nextJobId.getAndIncrement()
val dagScheduler = new DAGScheduler()
dagScheduler.submitJob(rdd, func, partitions, resultHandler)
}
在 Spark 任务中,submitJob()
方法负责将 RDD 转换成一组任务,并通过 DAGScheduler
提交这些任务。每个阶段包含多个任务,任务根据 RDD 的分区数来确定。
-
Stage 划分:
DAGScheduler
依据宽依赖(宽依赖会导致数据 shuffle)对 DAG 进行分解,将作业分成多个 Stage。每个 Stage 内的 Task 彼此独立并行。 -
Task 分发:
TaskScheduler
负责将 Task 分发到不同的 Executor 上执行,具体的分发逻辑根据集群资源情况和数据本地性进行优化。
任务执行(Executor)
在 Executor 上,Task
被实际执行。每个 Task 在一个 TaskContext 中运行,并将结果返回到 Driver:
class Executor {
def launchTask(task: Task[_]): Unit = {
val taskResult = task.run()
sendResultToDriver(taskResult)
}
}
1.2 Spark Streaming 任务的调度模型
与 Spark 不同,Spark Streaming 处理的是 实时数据流,其调度模式基于微批处理(micro-batch processing)。
- 微批处理:Spark Streaming 会将实时流数据分成小时间段的微批次(通常是数秒钟),并将每个时间段的数据视为一个静态的 RDD 来进行处理。
微批次调度流程
def start(): Unit = synchronized {
jobScheduler.start()
receiverTracker.start()
}
Spark Streaming 中的 start()
方法启动了两个核心调度组件:
- JobScheduler:负责为每个微批次生成一组作业,并提交给
DAGScheduler
。 - ReceiverTracker:管理数据接收器,负责从外部数据源(如 Kafka)接收流数据。
JobScheduler 的任务调度逻辑
每个微批次对应一个 JobSet
,JobScheduler
会为每个时间间隔生成并提交一个 JobSet
,该 JobSet
包含多个 Job
,每个 Job
又对应一个 RDD 转换。
class JobScheduler {
def generateJob(time: Time): Option[JobSet] = {
val jobs = createJobsForTime(time)
if (jobs.nonEmpty) {
Some(new JobSet(time, jobs))
} else {
None
}
}
def submitJobSet(jobSet: JobSet): Unit = {
dagScheduler.submitJobSet(jobSet)
}
}
-
JobSet:
JobSet
表示在一个微批次时间点内,所有需要执行的作业集合。每个JobSet
都会被提交到DAGScheduler
,并最终生成 Spark 任务进行执行。 -
周期性调度:
JobScheduler
会周期性地(根据流的批次间隔)调用generateJob
方法来创建新一轮的任务,确保数据的实时处理。
Streaming 任务执行
与 Spark 任务一样,Spark Streaming 的任务也是由 TaskScheduler
提交到 Executor 上运行的。由于 Spark Streaming 基于微批处理的模型,本质上每个微批次处理的任务还是 Spark 的普通任务。
2. 数据处理模式差异
2.1 Spark 的数据处理模式
Spark 是基于 RDD(弹性分布式数据集)来进行数据处理的。RDD 是不可变的数据抽象,它支持两种操作:
- Transformations:如
map
、filter
等操作,会生成新的 RDD。 - Actions:如
collect
、count
,会触发计算并返回结果。
Spark 的数据处理模式是批处理模式,即:
- 一次读取整个数据集。
- 对数据集进行转换和计算。
- 最终一次性输出结果。
2.2 Spark Streaming 的数据处理模式
Spark Streaming 则是基于 离散化流(DStream) 进行数据处理。DStream 是一系列 RDD 的抽象,代表一段时间内的数据。
- 微批处理:在 Spark Streaming 中,数据不是一次性处理,而是将实时数据流划分成多个小的时间段(如 1 秒),每个时间段的数据形成一个 RDD。每个批次处理的数据都是有限的一个子集。
class DStream {
def compute(time: Time): Option[RDD[T]] = {
// 生成时间点上的 RDD
}
}
- 持续性计算:DStream 会周期性地生成 RDD 并执行计算,这与 Spark 中一次性计算数据集有显著不同。
3. 任务的生命周期
3.1 Spark 任务的生命周期
在 Spark 中,任务的生命周期是 一次性的,针对静态数据集。作业被提交后,DAGScheduler
会将其划分成多个 Stage,每个 Stage 会生成一组 Task。这些 Task 被执行后,数据计算完成,作业结束。
任务的执行流程
- Driver 启动作业,生成 DAG 并划分 Stage。
- Task 被分配到 Executor 上执行。
- Task 执行完后,将结果返回到 Driver。
- 作业完成,任务生命周期结束。
3.2 Spark Streaming 任务的生命周期
在 Spark Streaming 中,任务的生命周期是 持续的,因为流数据是连续不断地到达的。Spark Streaming 的任务调度是基于时间间隔的,每隔一个时间窗口都会生成一批新的任务。
- 周期性任务生成:每个时间窗口会触发一次任务调度,生成一组新任务。
- 任务生命周期与数据流同步:只要流数据源持续有数据,任务就会持续被生成和执行。
任务的执行流程
- Driver 启动流计算应用,周期性生成微批次的任务。
- 每个微批次会生成一组作业,这些作业与 Spark 的批处理作业相似。
- Task 被分配到 Executor 上执行,处理当前批次的数据。
- 下一个时间窗口到达后,新的任务被生成。
4. 容错机制差异
4.1 Spark 的容错机制
Spark 的容错机制依赖于 RDD 的血缘关系(Lineage)。RDD 是不可变的,因此每个 RDD 都知道自己是如何通过转换操作(如 map
、filter
等)从父 RDD 派生出来的。这一信息被称为 血缘信息,它在数据丢失或任务失败时,能够重新计算丢失的数据。
4.1.1 血缘信息的作用
在 Spark 中,如果某个任务处理的分区(Partition)丢失,系统可以根据 RDD 的血缘信息,通过重新计算来恢复丢失的数据。RDD 的血缘信息是 Task 级别的容错基础。
abstract class RDD[T] {
// 血缘关系
def dependencies: Seq[Dependency[_]]
// 重新计算丢失的分区
def compute(partition: Partition, context: TaskContext): Iterator[T]
}
通过 dependencies
属性,RDD 可以记录其父 RDD 和依赖关系。如果某个分区数据丢失,系统可以根据这些依赖关系,重新计算该分区。
4.1.2 DAG 调度与任务重试
Spark 的调度器(DAGScheduler
)在执行作业时,会将其分解为多个阶段(Stage)。每个阶段包含一组 Task,这些 Task 是基于 RDD 的分区生成的。
- 当某个 Task 执行失败时,
DAGScheduler
会将该 Task 标记为失败,并根据血缘信息重新调度该任务。 - 默认情况下,Spark 会尝试 重新执行失败的任务。如果任务经过多次重试后仍然失败,Spark 会终止作业。
class DAGScheduler {
def handleTaskFailure(task: Task[_], reason: TaskFailedReason): Unit = {
val stage = task.stageAttemptId
if (stage != null && stage.failures < maxTaskFailures) {
// 重试任务
submitTask(stage, task.index)
} else {
// 任务失败次数过多,终止阶段
failStage(stage, reason)
}
}
}
- 在
DAGScheduler
中,失败的 Task 会被标记并重新调度。通过这种机制,Spark 保证了分区数据的可靠性,即使任务失败,也能够通过重试机制进行恢复。
4.1.3 宽依赖与窄依赖的容错性差异
-
窄依赖:每个子 RDD 的分区只依赖父 RDD 的一个或少量分区。比如
map
、filter
等操作。这类依赖容错性较好,因为只需要重新计算少量分区即可恢复数据。 -
宽依赖:每个子 RDD 的分区可能依赖多个父 RDD 的分区,比如
reduceByKey
、groupByKey
等。这种依赖通常需要进行数据的 shuffle 操作。在处理宽依赖时,数据恢复需要重新执行整个依赖链,这可能会涉及到大量数据重新计算,效率较低。
abstract class RDD[T] {
// 宽依赖或窄依赖
def dependencies: Seq[Dependency[_]]
}
4.2 Spark Streaming 的容错机制
相比 Spark,Spark Streaming 处理的是实时数据流,因此它的容错机制不仅要考虑任务失败,还要处理流数据的可靠接收、状态恢复等问题。
4.2.1 Write Ahead Logs (WAL)
为了保证数据不丢失,Spark Streaming 引入了 WAL(Write Ahead Log) 机制。WAL 通过将流数据持久化到日志中,确保即使节点或任务失败,数据也可以被恢复。
- 当 Spark Streaming 接收到流数据时,首先将数据写入 WAL 中进行持久化,然后才会进行计算。这确保了在任务失败或节点宕机后,系统可以从 WAL 中重新读取数据。
class WriteAheadLogBasedBlockHandler {
def storeBlock(streamId: Int, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult = {
// 将接收到的块写入 WAL
logManager.write(new WriteAheadLogRecord(serializedBlock))
// 然后存储到内存或磁盘
blockManager.putBlockData(blockId, serializedBlock, StorageLevel.MEMORY_AND_DISK_SER)
}
}
- WAL 机制确保了即使在任务执行失败后,流数据仍然能够通过日志重放来恢复。
4.2.2 Checkpointing(检查点)
Spark Streaming 的容错机制还包括 Checkpointing,它用于保存应用程序的元数据和状态信息。Checkpointing 可以分为两类:
- 元数据检查点:保存 StreamingContext、DStream 的结构信息,确保任务在重启后可以恢复之前的处理流程。
- 状态检查点:当使用有状态操作(如
updateStateByKey
)时,状态会被持久化到检查点中。
class StreamingContext {
def checkpoint(directory: String): Unit = {
this.checkpointDir = directory
checkpointWriter = new CheckpointWriter(checkpointDir, sc.env.blockManager)
}
}
- 在任务失败时,系统可以从检查点恢复状态和元数据,从而确保流处理继续进行。
4.2.3 任务失败重试
与 Spark 类似,Spark Streaming 也依赖于 DAGScheduler
和 TaskScheduler
进行任务重试。不过,由于 Spark Streaming 是基于微批处理的,每个批次处理的任务失败后,系统会重试整个批次的任务。
def handleBatchFailure(batchTime: Time, jobSet: JobSet): Unit = {
logWarning(s"Batch $batchTime failed. Retrying ...")
jobScheduler.submitJobSet(jobSet)
}
- 每个微批次的数据会生成一个
JobSet
,如果任务失败,系统会重新提交整个JobSet
。
4.2.4 Kafka 及其他流数据源的容错
对于像 Kafka 这样的流数据源,Spark Streaming 依赖于数据源的偏移量管理来实现容错。例如,Kafka 的偏移量(offset)用于追踪已经处理的数据位置。如果任务失败,Spark Streaming 会通过重新读取 Kafka 的偏移量来确保数据不会丢失。
class DirectKafkaInputDStream {
def createDirectStream[K, V](ssc: StreamingContext, kafkaParams: Map[String, Object], topics: Set[String]): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream(ssc, kafkaParams, topics)
}
}
在 DirectKafkaInputDStream
中,Spark Streaming 通过 Kafka 的偏移量追踪,确保每个微批次的数据都能可靠地重新读取和处理。
5. 数据处理模式的区别
5.1 Spark 的数据处理模式
Spark 处理的是 静态数据集,基于 RDD 的不可变性和分区(Partition)来并行处理数据。每个作业会被一次性提交,并将所有数据进行一次完整的计算。Spark 中常见的数据操作包括:
- Transformations:如
map
、flatMap
、filter
等操作用于转换 RDD。 - Actions:如
collect
、reduce
、count
等操作触发执行并返回结果。
Spark 的处理模式是批处理模式,它适用于静态的、离线的大数据集。
5.2 Spark Streaming 的数据处理模式
Spark Streaming 处理的是 实时数据流,其数据处理模式基于微批次。实时数据流被分割成小的时间片段,每个时间片段的数据被视为一个静态的 RDD 进行处理。
- DStream:DStream 是一系列 RDD 的抽象,代表了实时数据流在多个时间段内的处理结果。每个时间段的数据会形成一个新的 RDD 并进行计算。
class DStream {
def compute(time: Time): Option[RDD[T]] = {
// 生成对应时间段的 RDD
}
}
- 微批处理:每隔一个时间窗口,Spark Streaming 会生成一个新的 RDD,并对其进行处理。这种微批处理模式保证了实时数据的近实时处理。
6. 任务的生命周期差异
6.1 Spark 任务的生命周期
Spark 任务的生命周期是 一次性的,每个作业在提交后会经历以下几个步骤:
- Driver 解析作业并生成 DAG。
- DAG 被划分为多个 Stage。
- 每个 Stage 包含多个 Task,任务被分发到 Executor 执行。
- 任务执行完成后,数据被返回到 Driver,作业结束。
在批处理场景下,任务生命周期较短,处理完数据后任务即结束。
6.2 Spark Streaming 任务的生命周期
Spark Streaming 任务的生命周期是 持续的。Spark Streaming 是一个 长时间运行的任务,只要流数据源不断输入数据,任务就会持续生成新的微批次任务并进行计算。
- StreamingContext 启动后,
JobScheduler
定期生成微批次任务。 - 每个微批次会生成新的
JobSet
并提交给DAGScheduler
执行。 - 任务处理完成后,新的数据批次到达,继续生成新的任务。
- 任务不断运行,直到用户手动停止。
总结
- 任务调度:Spark 任务基于静态数据集,采用一次性批处理模式;Spark Streaming 任务基于流数据,采用微批处理模式,每隔一个时间窗口生成新的任务。
- 数据处理:Spark 处理静态的 RDD,数据只计算一次;Spark Streaming 处理离散化的流数据,每个时间窗口生成一个新的 RDD 并计算。
- 容错机制:Spark 任务依赖 RDD 血缘关系进行数据恢复;Spark Streaming 除了依赖血缘关系外,还引入了 WAL 和 Checkpointing 来保证流数据的容错性。
- 生命周期:Spark 任务是一次性执行的,而 Spark Streaming 是长时间运行的任务,会持续生成新的微批次进行处理。