详解Spark executor
在 Apache Spark 中,Executor(执行器) 是运行在集群工作节点(Worker Node)上的进程,负责执行具体的计算任务并管理数据。它是 Spark 分布式计算的核心组件之一,直接决定了任务的并行度和资源利用率。以下是 Executor 的详细解析:
1. Executor 的核心职责
职责 | 说明 |
---|---|
执行 Task | 运行 Driver 分配的 Task(包括 Shuffle Map Task 和 Result Task)。 |
数据存储 | 缓存 RDD 的分区数据(通过内存或磁盘),加速后续计算。 |
Shuffle 处理 | 处理 Shuffle 操作(如排序、聚合、溢写磁盘)。 |
与 Driver 通信 | 向 Driver 发送心跳,报告 Task 状态和块(Block)信息。 |
资源管理 | 管理分配给它的内存和 CPU 核心,确保任务高效运行。 |
2. Executor 的内部结构
(1) 线程池(Task Runner Threads)
- 每个 Executor 内部维护一个线程池,线程数由
spark.executor.cores
决定。 - 每个线程处理一个 Task,实现并行计算。
- 示例:若
spark.executor.cores=4
,则 Executor 最多同时运行 4 个 Task。
(2) 内存管理
- Executor 的内存分为两部分(通过
spark.memory.fraction
配置比例):- Execution Memory:用于计算(如 Shuffle、Join、Sort 的临时内存)。
- Storage Memory:用于缓存 RDD 和广播变量。
- 溢出机制:当内存不足时,数据溢写到磁盘(可能影响性能)。
(3) BlockManager
- 管理 Executor 的数据块(Block),包括本地和远程数据。
- 负责与其他 Executor 交换 Shuffle 数据。
3. Executor 的启动与资源分配
(1) 资源申请
- Driver 通过集群管理器(如 YARN、Kubernetes)申请 Executor 资源。
- 关键配置参数:
spark.executor.instances
:Executor 数量。spark.executor.memory
:每个 Executor 的内存(如4g
)。spark.executor.cores
:每个 Executor 的 CPU 核心数。
(2) Executor 启动流程
- Driver 向集群管理器发送资源请求。
- 集群管理器(如 YARN 的 ResourceManager)分配 Container。
- 在 Container 中启动
CoarseGrainedExecutorBackend
进程。 - Executor 向 Driver 注册,准备接收 Task。
4. Executor 与 Task 的执行
(1) Task 分发
- Driver 将 Task 序列化后发送给 Executor。
- Executor 反序列化 Task 代码并执行。
(2) 数据本地性(Locality)
- Executor 优先处理存储在本地的数据(如 HDFS 块),减少网络传输。
- 本地性级别:
PROCESS_LOCAL
>NODE_LOCAL
>RACK_LOCAL
>ANY
。
(3) Shuffle 过程
- Map 阶段:Executor 将 Shuffle 数据写入本地磁盘(或内存)。
- Reduce 阶段:Executor 从其他节点拉取 Shuffle 数据。
5. Executor 的容错机制
- Task 失败重试:若某个 Task 失败,Driver 会重新调度该 Task(最多
spark.task.maxFailures
次)。 - Executor 崩溃:
- Driver 检测到 Executor 失联后,向集群管理器申请新 Executor。
- 丢失的缓存数据需重新计算(依赖 RDD 血统)。
6. 配置优化与常见问题
(1) 内存配置优化
- 避免 OOM:
- 增加
spark.executor.memory
。 - 调整
spark.memory.fraction
(默认 0.6)和spark.memory.storageFraction
(默认 0.5)。
- 增加
- 示例配置:
spark-submit \ --executor-memory 8g \ --executor-cores 4 \ --conf spark.memory.fraction=0.7
(2) 并行度与数据倾斜
- 合理分区:确保每个 Task 处理的数据量均衡(通过
repartition
或调整分区数)。 - 处理倾斜:使用
salting
或自定义分区器。
(3) GC 调优
- 启用 G1 垃圾回收器(减少停顿时间):
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC"
7. Executor 与 Driver 的对比
特性 | Executor | Driver |
---|---|---|
角色 | 执行任务的“工人” | 协调任务的“指挥官” |
运行位置 | 集群的工作节点(Worker Node) | 客户端或集群节点(取决于部署模式) |
数据访问 | 仅处理分配的分区数据 | 可访问全局数据(如 collect() 结果) |
容错 | 无状态,失败后由 Driver 重新调度 Task | 单点故障,崩溃则整个应用失败 |
8. 典型问题与解决方案
(1) Executor 频繁 Full GC
- 现象:任务停滞,日志显示 GC 时间过长。
- 解决:
- 增加 Executor 内存。
- 减少缓存数据量,或使用序列化缓存(
MEMORY_ONLY_SER
)。
(2) Shuffle 数据溢出到磁盘
- 现象:任务变慢,磁盘 I/O 高。
- 解决:
- 增加
spark.executor.memory
。 - 优化 Shuffle 操作(如减少
groupByKey
,改用reduceByKey
)。
- 增加
(3) Executor 失联
- 现象:Driver 日志显示
ExecutorLostFailure
。 - 解决:
- 检查集群资源是否充足(如 YARN 资源队列)。
- 增加
spark.network.timeout
(默认 120s)。
总结
Executor 是 Spark 分布式计算的执行单元,负责 Task 运行、数据缓存和 Shuffle 处理。合理配置 Executor 的数量、内存和核心数是优化 Spark 应用性能的关键。通过调整资源参数、优化数据本地性和处理倾斜问题,可以显著提升任务的执行效率。