Spark 介绍
Spark 架构
Spark 是一个基于内存计算的大数据处理框架,相比 Hadoop 的 MapReduce,它能够提供 更高效的迭代计算 和 流式计算能力。Spark 采用 主从架构(Master-Slave),主要包括 Driver、Cluster Manager、Worker、Executor 和 Task 等核心组件。
1. Spark 组件架构
1.1 核心组件
组件 | 作用 |
---|---|
Driver(驱动程序) | 负责任务调度,向 Cluster Manager 申请资源,管理 SparkContext。 |
Cluster Manager(集群管理器) | 负责资源调度,如 Standalone、YARN、Mesos、Kubernetes。 |
Worker(工作节点) | 运行在集群节点上,管理 Executor 进程,执行具体计算任务。 |
Executor(执行器) | 由 Worker 启动,执行 Spark 任务,并存储中间计算数据。 |
Task(任务) | 运行在 Executor 之上,每个 Stage 被划分为多个 Task 并行执行。 |
1.2 Spark 架构示意图
+------------------------------------------------------+
| Driver |
| - 任务调度 |
| - 运行 SparkContext |
| - 将 Job 划分为多个 Stage |
+------------------------------------------------------+
| 向集群管理器申请资源
V
+------------------------------------------------------+
| Cluster Manager |
| - 资源调度 |
| - 可选:Standalone / YARN / Mesos / Kubernetes |
+------------------------------------------------------+
| 分配 Worker 节点
V
+----------------+ +----------------+ +----------------+
| Worker 1 | | Worker 2 | | Worker 3 |
| - 启动 Executor | | - 启动 Executor | | - 启动 Executor |
| - 执行 Task | | - 执行 Task | | - 执行 Task |
+----------------+ +----------------+ +----------------+
2. Spark 运行模式
Spark 可以运行在不同的集群管理器上:
- Standalone:Spark 自带的资源管理器,简单易用,适合小规模集群。
- YARN(Hadoop Yarn 集群):适合 Hadoop 生态环境。
- Mesos(Apache Mesos 集群):适合多租户资源调度。
- Kubernetes(K8s 集群):适用于云计算和容器化部署。
3. Spark 任务执行流程
Spark 任务的执行大致分为以下几个步骤:
3.1 任务提交
- Driver 进程启动 SparkContext,并向 Cluster Manager 申请资源。
- Cluster Manager 分配 Worker 节点,并在 Worker 上 启动 Executor。
3.2 Job 分解
- Driver 将 Job 拆分为多个 Stage(基于 DAG 计算)。
- 每个 Stage 由多个 Task 组成,并被分配到不同的 Executor 运行。
3.3 Task 执行
- Executor 执行 Task,计算数据并存储中间结果(RDD)。
- Executor 向 Driver 汇报任务执行状态,若失败则重新调度 Task。
3.4 结果返回
- 任务执行完成后,Driver 收集最终计算结果,存储到 HDFS、Kafka、MySQL 等。
4. Spark 计算模型
Spark 计算任务是基于 RDD(Resilient Distributed Dataset) 和 DAG(有向无环图) 进行调度的。
4.1 RDD(弹性分布式数据集)
RDD 是 Spark 最核心的数据抽象,提供:
- 分区(Partition):数据被分成多个分区,并行计算。
- 容错性:基于 Lineage(血缘) 记录转换关系,支持自动恢复。
- 惰性计算:只有在 Action 触发时,RDD 才会真正执行计算。
RDD 转换类型:
- Transformation(转换):如
map()
、filter()
、flatMap()
(不会立即执行)。 - Action(行动):如
count()
、collect()
、saveAsTextFile()
(触发计算)。
4.2 DAG(有向无环图)
- Spark 任务会构建 DAG(DAGScheduler),将 RDD 之间的依赖关系转换为多个 Stage。
- 每个 Stage 由 多个 Task 组成,并行执行计算任务。
示例:
val data = sc.textFile("hdfs://input.txt") // RDD1
val words = data.flatMap(_.split(" ")) // RDD2(Transformation)
val wordCount = words.map((_, 1)) // RDD3(Transformation)
val result = wordCount.reduceByKey(_ + _) // RDD4(Transformation)
result.saveAsTextFile("hdfs://output.txt") // Action 触发计算
Spark 内部执行过程:
- DAG 构建阶段:
- RDD1 -> RDD2 -> RDD3 -> RDD4
- Stage 划分阶段:
flatMap()
和map()
形成 Stage 1reduceByKey()
形成 Stage 2
- Task 并行执行:
- 每个 Stage 划分多个 Task,并分发到 Executor 执行。
5. Spark 生态组件
Spark 具备丰富的生态系统,适用于不同场景:
组件 | 作用 |
---|---|
Spark Core | RDD API,DAG 调度,任务执行。 |
Spark SQL | 运行 SQL 查询,支持 DataFrame、Dataset API。 |
Spark Streaming | 实时流处理,支持 Kafka、Flume 等数据源。 |
MLlib | 机器学习库,支持 K-Means、决策树等算法。 |
GraphX | 图计算引擎,支持 PageRank、社区检测等。 |
6. Spark 与 Hadoop 对比
对比项 | Spark | Hadoop(MapReduce) |
---|---|---|
计算模型 | RDD 内存计算 | 磁盘读写 |
速度 | 高速,适用于流计算 | 慢,适用于批处理 |
容错机制 | RDD 通过 Lineage 恢复 | 任务失败后重跑 |
适用场景 | 实时计算、流处理 | 批处理、大规模数据存储 |
7. 适用场景
- 数据分析(数据挖掘、数据清洗)
- 实时流计算(结合 Kafka 实现流式数据处理)
- 机器学习(推荐系统、分类预测)
- 图计算(社交网络分析、PageRank)
总结
Spark 采用 Driver + Executor 的分布式架构,基于 RDD 进行数据计算,通过 DAG 调度任务,并支持 SQL、流式计算、机器学习 等多种应用场景。相较于 Hadoop,Spark 计算更快,适合 大数据分析、实时计算和 AI 训练。
Checkpoint
Spark 中的 Checkpoint 作用
Checkpoint(检查点) 主要用于 RDD 持久化和容错,可以将 RDD 的数据存储到**持久化存储(如 HDFS、S3)**中,以便在失败时快速恢复计算,避免从头计算整个 DAG。
1. 为什么需要 Checkpoint?
在 Spark 中,RDD 具有血缘关系(Lineage),Spark 通过血缘追踪来进行故障恢复。如果某个计算任务失败,Spark 会重新从原始数据集按照血缘关系重新计算。
但是,在以下情况下,依赖血缘恢复可能导致 高额计算开销:
- RDD 计算链路太长:如果 RDD 经过多次 Transformation,失败后重新计算的开销会很大。
- Driver 内存溢出:RDD 的血缘信息存储在 Driver 中,过长的 Lineage 可能会导致 Driver 负担过重,甚至 OOM。
- 需要数据持久化:某些情况下(如流式计算),需要持久化部分数据以便后续任务读取。
Checkpoint 可以 截断 RDD 血缘依赖,将计算结果持久化,避免重复计算,提高容错能力。
2. Checkpoint 的作用
(1) 提高容错能力
- 在 RDD 发生丢失时,不再依赖 Lineage 重新计算,而是直接从持久化存储中加载数据,提高恢复速度。
(2) 减少 DAG 依赖
- 通过 Checkpoint 截断 RDD 的血缘依赖,避免 DAG 过长,减少 Driver 负担。
(3) 持久化计算结果
- 适用于需要在不同任务中复用的 RDD,如流式计算(Spark Streaming)中的状态数据。
3. Checkpoint vs Cache
Checkpoint | Cache / Persist | |
---|---|---|
存储位置 | 持久化到HDFS / S3 / 本地磁盘 | 存储在Executor 的内存 / 磁盘 |
数据存储方式 | 持久化后会丢弃 RDD 血缘信息 | 保留 RDD 血缘信息 |
恢复方式 | 任务失败后直接从 Checkpoint 读取 | 任务失败后需要从头重新计算 |
适用场景 | 长计算链路 / 流式计算 / 容错 | 短期数据复用 / 内存充足 |
- Cache/Persist 适用于频繁访问数据,但不能容错,如果 Executor 挂掉,数据会丢失,需要重新计算。
- Checkpoint 适用于长计算 DAG 或需要持久化数据的场景,但由于存储到 HDFS,速度较慢。
4. Checkpoint 使用方式
(1) 开启 Checkpoint
在使用 Checkpoint 之前,需要设置存储目录:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("CheckpointExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 1. 设置 Checkpoint 存储路径
sc.setCheckpointDir("hdfs://namenode:9000/spark-checkpoint")
// 2. 创建 RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
// 3. 设置 Checkpoint
rdd.checkpoint()
// 4. 触发计算
rdd.count()
sc.setCheckpointDir(path)
设置 Checkpoint 目录(必须是 HDFS、S3 或本地持久化存储)。rdd.checkpoint()
标记 RDD 需要 Checkpoint。- 由于 Checkpoint 是惰性执行的,必须在
Action
(如count()
、collect()
)时触发计算并存储。
(2) 与 Cache 结合使用
由于 Checkpoint 计算会重新执行整个 DAG,可以先 cache()
,然后 checkpoint()
,避免重复计算:
val rdd = sc.textFile("hdfs://namenode:9000/data.txt").map(_.split(" "))
rdd.cache() // 缓存 RDD 避免重复计算
rdd.checkpoint() // 持久化数据
rdd.count() // 触发计算
cache()
先把数据缓存到内存,避免在 checkpoint 时重复计算。
5. Checkpoint 在 Spark Streaming 中的应用
在 Spark Streaming 中,Checkpoint 用于存储 Streaming 计算状态,保证数据处理的容错性,防止任务重启后状态丢失。
(1) 设置 Checkpoint 目录
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
// 设置 Checkpoint 目录
ssc.checkpoint("hdfs://namenode:9000/streaming-checkpoint")
// 创建 DStream
val lines = ssc.socketTextStream("localhost", 9999)
val wordCounts = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
// 启动流式计算
ssc.start()
ssc.awaitTermination()
ssc.checkpoint(path)
设置 Checkpoint 目录,用于存储流式计算的状态数据(如窗口聚合数据)。- 适用于 窗口操作(window, updateStateByKey) 场景。
6. 总结
-
Checkpoint 作用:
- 持久化 RDD,避免 DAG 过长导致计算性能下降。
- 提高容错性,避免 Executor 挂掉时重算整个 DAG。
- 适用于 Streaming 计算,存储流式数据状态。
-
使用方法:
- 先
sc.setCheckpointDir()
设置目录。 - 对 RDD 调用
checkpoint()
。 - 触发 Action(如
count()
)来执行 checkpoint 计算。
- 先
-
Checkpoint vs Cache
- Cache/Persist 适用于临时缓存,提高性能,但不具备容错能力。
- Checkpoint 适用于长计算链路、流式计算,保证容错,但性能略慢。
🚀 最佳实践:
- 长时间运行的任务(如 Spark Streaming)必须开启 Checkpoint。
- Checkpoint 和 Cache 结合使用,避免重复计算导致性能下降。
并行度
Apache Spark 是一个分布式并行计算框架,基于 RDD(弹性分布式数据集) 进行并行计算,并利用集群资源提高计算效率。
Spark 的计算模型遵循 MapReduce 的思想,但相比 Hadoop,Spark 采用 内存计算,并且支持更加细粒度的任务调度和优化,大大提升了计算性能。
Spark 的并行度(parallelism) 取决于以下几个因素:
- RDD 的分区数(Partitions)
- Executor 的数量
- CPU 核心数
- 并行任务数(Task 并发数)
1. RDD 的分区数
在 Spark 中,RDD 是由多个 分区(Partitions) 组成的,每个分区可以在一个 Task 中独立计算,因此分区数决定了并行度。
- 默认情况下:
sc.textFile(path)
读取 HDFS 文件时,分区数 = HDFS block 数量(通常是 128MB 一个 block)。sc.parallelize(data, numSlices)
允许手动指定分区数numSlices
。
示例:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 3) // 设置 3 个分区
println(rdd.partitions.length) // 输出: 3
分区数越多,并行度越高,但过多的分区会导致 任务调度开销增加,降低整体效率。
2. Executor 并行度
Executor 是 Spark 任务的执行单元,每个 Executor 拥有多个 CPU 核心,可同时运行多个 Task。
- Executor 并行度计算方式:
[
并行度 = Executors 数量 \times 每个 Executor 的 CPU 核心数
]
例如:
--num-executors 5 --executor-cores 4
表示:
- 5 个 Executors
- 每个 Executor 4 核心
- 最大并行 Task 数 = 5 × 4 = 20
3. 并行任务数(Task 并发数)
Spark 会按照RDD 分区数来决定 Task 数量,并由集群的可用资源(Executor 和 核心数)来决定同时能运行的 Task 数量。
并行任务数计算公式:
[
并行任务数 = min( RDD 分区数, 总 CPU 核心数 )
]
例如:
- RDD 分区数 = 100
- Spark 资源 = 10 Executors,每个 4 核心
- 总可用核心数 = 10 × 4 = 40
并行度 = min(100, 40) = 40(同时执行 40 个 Task)
如何调整并行度?
- 增加 RDD 分区数:
rdd.repartition(n)
(增加或减少分区)rdd.coalesce(n)
(减少分区,避免数据洗牌)
- 增加 Executor 核心数
--executor-cores N
--num-executors M
- 增加 Task 并发
spark.default.parallelism
(全局默认并行度)spark.sql.shuffle.partitions
(SQL Shuffle 时的分区数)
示例:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 10) // 增加分区数提高并行度
总结
影响因素 | 说明 | 影响并行度 |
---|---|---|
RDD 分区数 | 任务并行度取决于分区数 | 分区数越多并行度越高 |
Executor 数量 | 任务运行的执行节点数量 | Executors 越多并行度越高 |
Executor 核心数 | 每个 Executor 可并行运行的 Task 数 | 核心数越多并行度越高 |
Task 并发数 | Task 调度和 CPU 资源影响并发 | Task 数量受 CPU 资源限制 |
🔥 最佳实践:
- 大数据计算时,确保 RDD 分区数 ≥ 任务 CPU 核心数,以充分利用计算资源。
- 避免单个 Task 计算过长,导致 CPU 资源利用率低下。
- Spark SQL 计算时,适当调整
spark.sql.shuffle.partitions
(默认 200),减少 Shuffle 代价。
🚀 结论:
Spark 是 并行计算框架,并行度主要由 RDD 分区数、Executor 数量、CPU 核心数、任务调度 共同决定,合理调整参数可以优化计算性能。