当前位置: 首页 > article >正文

Spark 介绍

Spark 架构

Spark 是一个基于内存计算的大数据处理框架,相比 Hadoop 的 MapReduce,它能够提供 更高效的迭代计算流式计算能力。Spark 采用 主从架构(Master-Slave),主要包括 Driver、Cluster Manager、Worker、Executor 和 Task 等核心组件。

1. Spark 组件架构

1.1 核心组件

组件作用
Driver(驱动程序)负责任务调度,向 Cluster Manager 申请资源,管理 SparkContext。
Cluster Manager(集群管理器)负责资源调度,如 Standalone、YARN、Mesos、Kubernetes。
Worker(工作节点)运行在集群节点上,管理 Executor 进程,执行具体计算任务。
Executor(执行器)由 Worker 启动,执行 Spark 任务,并存储中间计算数据。
Task(任务)运行在 Executor 之上,每个 Stage 被划分为多个 Task 并行执行。

1.2 Spark 架构示意图

+------------------------------------------------------+
|                        Driver                        |
|  - 任务调度                                           |
|  - 运行 SparkContext                                |
|  - 将 Job 划分为多个 Stage                           |
+------------------------------------------------------+
             | 向集群管理器申请资源
             V
+------------------------------------------------------+
|                  Cluster Manager                     |
|  - 资源调度                                          |
|  - 可选:Standalone / YARN / Mesos / Kubernetes      |
+------------------------------------------------------+
             | 分配 Worker 节点
             V
+----------------+      +----------------+      +----------------+
|    Worker 1    |      |    Worker 2    |      |    Worker 3    |
|  - 启动 Executor  |      |  - 启动 Executor  |      |  - 启动 Executor  |
|  - 执行 Task    |      |  - 执行 Task    |      |  - 执行 Task    |
+----------------+      +----------------+      +----------------+

2. Spark 运行模式

Spark 可以运行在不同的集群管理器上:

  • Standalone:Spark 自带的资源管理器,简单易用,适合小规模集群。
  • YARN(Hadoop Yarn 集群):适合 Hadoop 生态环境。
  • Mesos(Apache Mesos 集群):适合多租户资源调度。
  • Kubernetes(K8s 集群):适用于云计算和容器化部署。

3. Spark 任务执行流程

Spark 任务的执行大致分为以下几个步骤:

3.1 任务提交

  1. Driver 进程启动 SparkContext,并向 Cluster Manager 申请资源
  2. Cluster Manager 分配 Worker 节点,并在 Worker 上 启动 Executor

3.2 Job 分解

  1. Driver 将 Job 拆分为多个 Stage(基于 DAG 计算)。
  2. 每个 Stage 由多个 Task 组成,并被分配到不同的 Executor 运行。

3.3 Task 执行

  1. Executor 执行 Task,计算数据并存储中间结果(RDD)。
  2. Executor 向 Driver 汇报任务执行状态,若失败则重新调度 Task。

3.4 结果返回

  1. 任务执行完成后,Driver 收集最终计算结果,存储到 HDFS、Kafka、MySQL 等。

4. Spark 计算模型

Spark 计算任务是基于 RDD(Resilient Distributed Dataset)DAG(有向无环图) 进行调度的。

4.1 RDD(弹性分布式数据集)

RDD 是 Spark 最核心的数据抽象,提供:

  • 分区(Partition):数据被分成多个分区,并行计算。
  • 容错性:基于 Lineage(血缘) 记录转换关系,支持自动恢复。
  • 惰性计算:只有在 Action 触发时,RDD 才会真正执行计算。

RDD 转换类型:

  • Transformation(转换):如 map()filter()flatMap()(不会立即执行)。
  • Action(行动):如 count()collect()saveAsTextFile()(触发计算)。

4.2 DAG(有向无环图)

  • Spark 任务会构建 DAG(DAGScheduler),将 RDD 之间的依赖关系转换为多个 Stage
  • 每个 Stage 由 多个 Task 组成,并行执行计算任务。

示例:

val data = sc.textFile("hdfs://input.txt")  // RDD1
val words = data.flatMap(_.split(" "))      // RDD2(Transformation)
val wordCount = words.map((_, 1))           // RDD3(Transformation)
val result = wordCount.reduceByKey(_ + _)   // RDD4(Transformation)
result.saveAsTextFile("hdfs://output.txt")  // Action 触发计算

Spark 内部执行过程:

  1. DAG 构建阶段
    • RDD1 -> RDD2 -> RDD3 -> RDD4
  2. Stage 划分阶段
    • flatMap()map() 形成 Stage 1
    • reduceByKey() 形成 Stage 2
  3. Task 并行执行
    • 每个 Stage 划分多个 Task,并分发到 Executor 执行。

5. Spark 生态组件

Spark 具备丰富的生态系统,适用于不同场景:

组件作用
Spark CoreRDD API,DAG 调度,任务执行。
Spark SQL运行 SQL 查询,支持 DataFrame、Dataset API。
Spark Streaming实时流处理,支持 Kafka、Flume 等数据源。
MLlib机器学习库,支持 K-Means、决策树等算法。
GraphX图计算引擎,支持 PageRank、社区检测等。

6. Spark 与 Hadoop 对比

对比项SparkHadoop(MapReduce)
计算模型RDD 内存计算磁盘读写
速度高速,适用于流计算慢,适用于批处理
容错机制RDD 通过 Lineage 恢复任务失败后重跑
适用场景实时计算、流处理批处理、大规模数据存储

7. 适用场景

  • 数据分析(数据挖掘、数据清洗)
  • 实时流计算(结合 Kafka 实现流式数据处理)
  • 机器学习(推荐系统、分类预测)
  • 图计算(社交网络分析、PageRank)

总结

Spark 采用 Driver + Executor 的分布式架构,基于 RDD 进行数据计算,通过 DAG 调度任务,并支持 SQL、流式计算、机器学习 等多种应用场景。相较于 Hadoop,Spark 计算更快,适合 大数据分析、实时计算和 AI 训练

Checkpoint

Spark 中的 Checkpoint 作用

Checkpoint(检查点) 主要用于 RDD 持久化和容错,可以将 RDD 的数据存储到**持久化存储(如 HDFS、S3)**中,以便在失败时快速恢复计算,避免从头计算整个 DAG。

1. 为什么需要 Checkpoint?

在 Spark 中,RDD 具有血缘关系(Lineage),Spark 通过血缘追踪来进行故障恢复。如果某个计算任务失败,Spark 会重新从原始数据集按照血缘关系重新计算
但是,在以下情况下,依赖血缘恢复可能导致 高额计算开销

  1. RDD 计算链路太长:如果 RDD 经过多次 Transformation,失败后重新计算的开销会很大。
  2. Driver 内存溢出:RDD 的血缘信息存储在 Driver 中,过长的 Lineage 可能会导致 Driver 负担过重,甚至 OOM。
  3. 需要数据持久化:某些情况下(如流式计算),需要持久化部分数据以便后续任务读取。

Checkpoint 可以 截断 RDD 血缘依赖,将计算结果持久化,避免重复计算,提高容错能力。

2. Checkpoint 的作用

(1) 提高容错能力

  • 在 RDD 发生丢失时,不再依赖 Lineage 重新计算,而是直接从持久化存储中加载数据,提高恢复速度。

(2) 减少 DAG 依赖

  • 通过 Checkpoint 截断 RDD 的血缘依赖,避免 DAG 过长,减少 Driver 负担。

(3) 持久化计算结果

  • 适用于需要在不同任务中复用的 RDD,如流式计算(Spark Streaming)中的状态数据。

3. Checkpoint vs Cache

CheckpointCache / Persist
存储位置持久化到HDFS / S3 / 本地磁盘存储在Executor 的内存 / 磁盘
数据存储方式持久化后会丢弃 RDD 血缘信息保留 RDD 血缘信息
恢复方式任务失败后直接从 Checkpoint 读取任务失败后需要从头重新计算
适用场景长计算链路 / 流式计算 / 容错短期数据复用 / 内存充足
  • Cache/Persist 适用于频繁访问数据,但不能容错,如果 Executor 挂掉,数据会丢失,需要重新计算。
  • Checkpoint 适用于长计算 DAG 或需要持久化数据的场景,但由于存储到 HDFS,速度较慢。

4. Checkpoint 使用方式

(1) 开启 Checkpoint

在使用 Checkpoint 之前,需要设置存储目录

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("CheckpointExample").setMaster("local[*]")
val sc = new SparkContext(conf)

// 1. 设置 Checkpoint 存储路径
sc.setCheckpointDir("hdfs://namenode:9000/spark-checkpoint")

// 2. 创建 RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))

// 3. 设置 Checkpoint
rdd.checkpoint()

// 4. 触发计算
rdd.count()
  • sc.setCheckpointDir(path) 设置 Checkpoint 目录(必须是 HDFS、S3 或本地持久化存储)。
  • rdd.checkpoint() 标记 RDD 需要 Checkpoint
  • 由于 Checkpoint 是惰性执行的,必须在 Action(如 count()collect())时触发计算并存储。

(2) 与 Cache 结合使用

由于 Checkpoint 计算会重新执行整个 DAG,可以先 cache(),然后 checkpoint(),避免重复计算:

val rdd = sc.textFile("hdfs://namenode:9000/data.txt").map(_.split(" "))

rdd.cache()  // 缓存 RDD 避免重复计算
rdd.checkpoint()  // 持久化数据

rdd.count()  // 触发计算
  • cache() 先把数据缓存到内存,避免在 checkpoint 时重复计算

5. Checkpoint 在 Spark Streaming 中的应用

Spark Streaming 中,Checkpoint 用于存储 Streaming 计算状态,保证数据处理的容错性,防止任务重启后状态丢失。

(1) 设置 Checkpoint 目录

import org.apache.spark.streaming.{Seconds, StreamingContext}

// 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))

// 设置 Checkpoint 目录
ssc.checkpoint("hdfs://namenode:9000/streaming-checkpoint")

// 创建 DStream
val lines = ssc.socketTextStream("localhost", 9999)
val wordCounts = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

// 启动流式计算
ssc.start()
ssc.awaitTermination()
  • ssc.checkpoint(path) 设置 Checkpoint 目录,用于存储流式计算的状态数据(如窗口聚合数据)。
  • 适用于 窗口操作(window, updateStateByKey) 场景。

6. 总结

  • Checkpoint 作用:

    1. 持久化 RDD,避免 DAG 过长导致计算性能下降。
    2. 提高容错性,避免 Executor 挂掉时重算整个 DAG。
    3. 适用于 Streaming 计算,存储流式数据状态。
  • 使用方法:

    1. sc.setCheckpointDir() 设置目录。
    2. 对 RDD 调用 checkpoint()
    3. 触发 Action(如 count())来执行 checkpoint 计算。
  • Checkpoint vs Cache

    • Cache/Persist 适用于临时缓存,提高性能,但不具备容错能力
    • Checkpoint 适用于长计算链路、流式计算,保证容错,但性能略慢

🚀 最佳实践:

  • 长时间运行的任务(如 Spark Streaming)必须开启 Checkpoint
  • Checkpoint 和 Cache 结合使用,避免重复计算导致性能下降。

并行度

Apache Spark 是一个分布式并行计算框架,基于 RDD(弹性分布式数据集) 进行并行计算,并利用集群资源提高计算效率。

Spark 的计算模型遵循 MapReduce 的思想,但相比 Hadoop,Spark 采用 内存计算,并且支持更加细粒度的任务调度和优化,大大提升了计算性能。

Spark 的并行度(parallelism) 取决于以下几个因素:

  1. RDD 的分区数(Partitions)
  2. Executor 的数量
  3. CPU 核心数
  4. 并行任务数(Task 并发数)

1. RDD 的分区数

在 Spark 中,RDD 是由多个 分区(Partitions) 组成的,每个分区可以在一个 Task 中独立计算,因此分区数决定了并行度

  • 默认情况下:
    • sc.textFile(path) 读取 HDFS 文件时,分区数 = HDFS block 数量(通常是 128MB 一个 block)。
    • sc.parallelize(data, numSlices) 允许手动指定分区数 numSlices

示例:

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 3) // 设置 3 个分区
println(rdd.partitions.length)  // 输出: 3

分区数越多,并行度越高,但过多的分区会导致 任务调度开销增加,降低整体效率。

2. Executor 并行度

Executor 是 Spark 任务的执行单元,每个 Executor 拥有多个 CPU 核心,可同时运行多个 Task。

  • Executor 并行度计算方式:
    [
    并行度 = Executors 数量 \times 每个 Executor 的 CPU 核心数
    ]

例如:

--num-executors 5  --executor-cores 4

表示:

  • 5 个 Executors
  • 每个 Executor 4 核心
  • 最大并行 Task 数 = 5 × 4 = 20

3. 并行任务数(Task 并发数)

Spark 会按照RDD 分区数来决定 Task 数量,并由集群的可用资源(Executor 和 核心数)来决定同时能运行的 Task 数量

并行任务数计算公式:
[
并行任务数 = min( RDD 分区数, 总 CPU 核心数 )
]
例如:

  • RDD 分区数 = 100
  • Spark 资源 = 10 Executors,每个 4 核心
  • 总可用核心数 = 10 × 4 = 40

并行度 = min(100, 40) = 40(同时执行 40 个 Task)

如何调整并行度?

  1. 增加 RDD 分区数
    • rdd.repartition(n)(增加或减少分区)
    • rdd.coalesce(n)(减少分区,避免数据洗牌)
  2. 增加 Executor 核心数
    • --executor-cores N
    • --num-executors M
  3. 增加 Task 并发
    • spark.default.parallelism(全局默认并行度)
    • spark.sql.shuffle.partitions(SQL Shuffle 时的分区数)
      示例:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 10) // 增加分区数提高并行度

总结

影响因素说明影响并行度
RDD 分区数任务并行度取决于分区数分区数越多并行度越高
Executor 数量任务运行的执行节点数量Executors 越多并行度越高
Executor 核心数每个 Executor 可并行运行的 Task 数核心数越多并行度越高
Task 并发数Task 调度和 CPU 资源影响并发Task 数量受 CPU 资源限制

🔥 最佳实践:

  • 大数据计算时,确保 RDD 分区数 ≥ 任务 CPU 核心数,以充分利用计算资源。
  • 避免单个 Task 计算过长,导致 CPU 资源利用率低下
  • Spark SQL 计算时,适当调整 spark.sql.shuffle.partitions(默认 200),减少 Shuffle 代价。

🚀 结论:
Spark 是 并行计算框架,并行度主要由 RDD 分区数、Executor 数量、CPU 核心数、任务调度 共同决定,合理调整参数可以优化计算性能。


http://www.kler.cn/a/567444.html

相关文章:

  • final 关键字在不同上下文中的用法及其名称
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_open_file
  • 性能测试监控工具jmeter+grafana
  • ave-form.vue 组件中 如何将产品名称发送给后端 ?
  • Unity插件-Mirror使用方法(二)组件介绍
  • 【学术会议论文投稿】Spring Boot实战:零基础打造你的Web应用新纪元
  • C++之 “” 用法(总结)
  • 【Oracle脚本】消耗CPU高的SQL抓取
  • JavaPro _JVM 知识点速记 JVM大全
  • 【AVL树】—— 我与C++的不解之缘(二十三)
  • GitCode 助力 python-office:开启 Python 自动化办公新生态
  • 机器学习的通用工作流程
  • 若依框架修改为多租户
  • OptiTrack光学跟踪系统:引领工厂机器人应用的革新浪潮
  • 克隆项目到本地
  • C++(Qt)软件调试---Linux 性能分析器perf(29)
  • lua学习(二)
  • Compose笔记(七)--Modifier
  • rustup-init.exe 安装缓慢的解决办法
  • CSS浮动详解