当前位置: 首页 > article >正文

Spark 中 cache、persist 和 checkpoint 优化数据处理的三种重要机制介绍

在 Spark 中,cachepersistcheckpoint 是优化数据处理的三种重要机制。它们都旨在减少数据重算和优化性能,但有各自的应用场景和实现原理。下面从源码角度分析其原理、作用和适用场景。


1. 基本概念和作用

机制作用存储介质
cache将数据存储在内存中以加快后续计算速度。默认存储在内存
persist提供多种存储级别(如内存和磁盘),支持灵活选择存储策略。内存、磁盘、堆外内存等
checkpoint将数据保存到可靠的存储系统(如 HDFS),提供容错能力,打断 DAG 依赖链。HDFS 或其他持久化存储

2. 核心原理

2.1 Cache

cachepersist 的简化版,其底层实现直接调用 persist(StorageLevel.MEMORY_AND_DISK),默认将数据存储在内存中,如果内存不足,则溢写到磁盘。

源码分析

  • 在 RDD 中,cache() 的代码:
    def cache(): this.type = persist(StorageLevel.MEMORY_AND_DISK)
    
  • persist 方法核心逻辑:
    def persist(newLevel: StorageLevel): this.type = {
      if (storageLevel != StorageLevel.NONE && storageLevel != newLevel) {
        throw new UnsupportedOperationException("Cannot change storage level...")
      }
      storageLevel = newLevel
      this
    }
    
  • 执行时,RDD 的 computeOrReadCheckpoint 方法判断是否已经缓存:
    if (isCached) {
      SparkEnv.get.blockManager.getOrElseUpdate(blockId, ...)
    } else {
      compute(split, context)
    }
    

作用

  • 加速重复计算:避免重复计算 DAG 中的父节点。
  • 默认存储级别为 MEMORY_AND_DISK,当内存不足时,溢写磁盘。

适用场景

  • 数据需要被多次使用,但不需要跨作业的容错能力。
  • 计算代价大,但内存能够容纳数据。

2.2 Persist

persistcache 的增强版,允许用户选择存储级别(StorageLevel),如:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • DISK_ONLY
  • 堆外内存、序列化存储等。

源码分析

  • StorageLevel 是一个枚举类,定义了各种存储级别:
    case class StorageLevel(
      useDisk: Boolean,
      useMemory: Boolean,
      useOffHeap: Boolean,
      deserialized: Boolean,
      replication: Int
    )
    
  • persist 方法直接调用 BlockManager 存储数据,核心逻辑:
    blockManager.putIterator(
      blockId,
      values,
      level,
      tellMaster = true
    )
    

作用

  • 提供更灵活的存储策略,适应内存、磁盘等不同环境。

适用场景

  • 数据较大,内存无法完全容纳,需要存储到磁盘或其他媒介。
  • 数据跨作业使用时(需确保存储级别满足作业要求)。

2.3 Checkpoint

checkpoint 会将 RDD 的数据保存到可靠存储(如 HDFS),并将 RDD 的依赖链打断,从而减少 DAG 深度,增强容错能力。

源码分析

  • RDDcheckpoint 方法:
    def checkpoint(): Unit = synchronized {
      if (doCheckpoint()) { // 检查是否需要 checkpoint
        val newRDD = new CheckpointRDD(this)
        this.rdd = newRDD // 更新依赖为 CheckpointRDD
      }
    }
    
  • CheckpointRDD 会从持久化存储中加载数据:
    override def compute(split: Partition, context: TaskContext): Iterator[T] = {
      val path = getCheckpointPath(split)
      val data = loadFromHDFS(path)
      data.iterator
    }
    

作用

  • 容错:数据保存到可靠存储中。
  • 优化 DAG:打断长依赖链,减少重算开销。

适用场景

  • 作业链较长,DAG 深度过大,容易导致重算开销。
  • 需要跨作业使用 RDD 数据,且要求数据容错性强。

3. 使用对比

特点CachePersistCheckpoint
存储位置内存(默认)或磁盘溢写多种存储级别可靠存储系统(如 HDFS)
容错性低,数据丢失需重算低至中,取决于存储级别高,数据可靠存储
DAG 优化有,打断依赖链
开销较低高(需要持久化和 I/O 操作)

4. 使用场景总结

Cache
  • 数据需要被频繁多次使用,且内存能够容纳。
  • 例如:在机器学习中对训练数据进行多次迭代。
Persist
  • 数据规模较大,内存无法完全容纳,需结合磁盘。
  • 例如:图计算中存储中间结果,避免重复计算。
Checkpoint
  • 作业链较长,可能因 DAG 深度导致失败或性能下降。
  • 需要跨作业的容错能力。
  • 例如:深度学习中的训练数据预处理、长链条依赖的 ETL 作业。

5. 综合优化建议

  • 优先考虑 cachepersist:仅当 DAG 深度问题显著时,使用 checkpoint
  • 设置合理的存储级别:根据内存和磁盘资源选择最优存储策略。
  • 结合 checkpointpersist:在 Checkpoint 前对数据 Persist,避免重新计算数据。

http://www.kler.cn/a/401704.html

相关文章:

  • NLP论文速读(EMNLP 2024)|动态奖励与提示优化来帮助语言模型的进行自我对齐
  • AJAX笔记 (速通精华版)
  • STM32 独立看门狗(IWDG)详解
  • Unix发展历程的深度探索
  • linux004.在ubuntu中smb.conf配置文件中配置内容详解
  • 每日OJ题_牛客_天使果冻_递推_C++_Java
  • 视频直播5G CPE解决方案:ZX7981PG/ZX7981PMWIFI6网络覆盖
  • Go 并发
  • windows已建立威胁IP排查
  • R语言基础入门详解
  • 【list的模拟实现】—— 我与C++的模拟实现(十四)
  • 经典的网络安全技术
  • 解决在使用JetBrains IDEs(如PyCharm或CLion)进行GitHub项目分享时,用户经常遇到“此站点的访问已被限制”的问题
  • 相机标定原理
  • SpringBoot升级全纪录之项目启动
  • Acme PHP - Let‘s Encrypt
  • 卷积神经网络之Yolo详解
  • Kotlin的data class
  • JSP是如何被执行的?
  • LabVIEW多通道面阵烟雾透过率测试系统
  • VSCode自定义插件创建教程
  • 软间隔支持向量机支持向量的情况以及点的各种情况
  • Java集合分页
  • uni-app快速入门(十二)--常用API(中)
  • 【Vim/Vi/Gvim操作】:列操作
  • SpringcloudAlibaba详解---超详细