当前位置: 首页 > article >正文

Spark 运行时对哪些数据会做缓存?

在Spark应用执行过程中,某些需要多次使用或者重新计算的数据会进行缓存,为后续更多的计算操作复用,避免了再次计算,从而减少应用的执行时间,加速整体计算进度。

那么,在计算过程中,哪些数据是需要被缓存的。接下来我们通过一个简单的例子来回答一下。

如下示例,首先对输入数据进行map()计算,得到 mappedRDD,然后再对 mappedRDD 分别进行两种计算:reduceByKey + foreach(println) 和 groupByKey + foreach(println)。

//输入数据
var inputRDD = sc.parallelize(Array[(Int,String)])(1,"a",2,"b",3,"c",4,"d",5,"e",3,"f",2,"g",1,"h",2,"i",
),3)
val mappedRDD = inputRDD.map(r => (r._1 + 1, r._2))
// mappedRDD.cache()
val reducedRDD = mappedRDD.reduceByKey((x,y) => x + "_" + y, 2)
reducedRDD.foreach(println)
val groupedRDD = mappedRDD.groupByKey(3).mapValues(V => V.toList)
groupedRDD.foreach(println)

由于应用中存在两个foreach()行动算子,那么就会形成两个job,并且这两个job都是从inputRDD开始计算的。如下图所示。

在这里插入图片描述

不难看出,在这两个job中,inputRDD => mappedRDD 的计算流程都是一样的,理论上第二个job可以直接从 mappedRDD 开始进行计算的。

我们在代码中取掉mappedRDD.cache()语句的注释,声明一下mappedRDD需要被缓存。

需要注意的是:cache() 操作表示将数据直接写入到内存中,并且cache() 是lazy操作,并不会立即执行的,只有在第一个job运行时才将要缓存的数据写入内存中。

//输入数据
var inputRDD = sc.parallelize(Array[(Int,String)])(1,"a",2,"b",3,"c",4,"d",5,"e",3,"f",2,"g",1,"h",2,"i",
),3)
val mappedRDD = inputRDD.map(r => (r._1 + 1, r._2))
mappedRDD.cache()
val reducedRDD = mappedRDD.reduceByKey((x,y) => x + "_" + y, 2)
reducedRDD.foreach(println)
val groupedRDD = mappedRDD.groupByKey(3).mapValues(V => V.toList)
groupedRDD.foreach(println)

如下图所示,对 mappedRDD 进行缓存之后,可以避免第二个job再重复map() 计算。

在这里插入图片描述

但是带来的代价就是会只占用很多内存空间来缓存数据,试想,如果mappedRDD 包含了上亿个record,那么其存储将带来很大的内存消耗。这时设置缓存并非最优解,需要权衡计算成本和存储成本。

在本例中,map()操作的计算逻辑很简单,只需要少量的计算成本,如果mappedRDD需要很大的存储空间时,那么就不会对其进行缓存。

缓存机制实际上是一种用空间来换时间的方式,那么如何判断数据是否应该被设置缓存呢?

在 spark 缓存机制中,只要满足如下三个条即可:
(1)会被重复使用的数据。并且被重复使用次数越多,那么缓存带来的性价比也会越高。一般来说,迭代型和交互型的应用会比较适合。
(2)数据不适合过大。因为数据量级过大,会占用大量的存储空间,导致内存不足,会直接降低数据计算可使用的空间(可参考上一篇spark内存的介绍)。虽然说缓存过大也可以存放在磁盘中,但是磁盘IO的成本也比较高,甚至不如重新计算的成本低。
(3)非重复缓存的数据。什么意思呢?其实就是如果缓存了某RDD,那么与其存在一对一血缘的父RDD就不需要缓存了。在本例中,如果对 mappedRDD 进行了缓存,那么就没有必要对 inputRDD 进行缓存了。除非有新的job会重复用到inputRDD且不会用到 mappedRDD。

除此之外,不仅RDD可以被缓存,广播数据task计算结果数据也可以被缓存。


http://www.kler.cn/a/445102.html

相关文章:

  • 如何设计一个秒杀系统
  • js常用方法之: 预览大图(uniapp原生方法封装)
  • 环境变量的知识
  • Nginx(Linux之Ubuntu)
  • CRYPTO密码学
  • level2逐笔委托查询接口
  • 怎样衡量电阻负载的好坏
  • (六)Spring Cloud Alibaba 2023.x:Sentinel 流量控制与熔断限流实现
  • 2024年12月16日Github流行趋势
  • mfc140.dll文件缺失的修复方法分享,全面分析mfc140.dll的几种解决方法
  • 如何使用git新建本地仓库并关联远程仓库的步骤(详细易懂)
  • LoadBalancer负载均衡和Nginx负载均衡区别理解
  • ubuntu解决ssh连接:Permission denied (publickey)
  • 【Linux】AlmaLinux 9.5虚拟机安装过程记录分享
  • ubuntu--用户
  • 【vue】npm install 报错 python2 Error: not found: python2
  • Day27 C++ 动态内存
  • ArcGIS Pro 3.4新功能3:空间统计新特性,基于森林和增强分类与回归,过滤空间自相关
  • CTFHUB 历年真题 afr-1
  • 如何编辑调试gradle,打印日志
  • upload-labs靶场
  • uniapp小程序抽奖怎么做?直接使用【almost-lottery转盘组件】或者【自定义宫格转盘】
  • SL4008B升降压芯片 9-36V降12V/2A 耐压40V 外置MOS管 80W大功率IC
  • 自动化运维平台的选型指南:开源与商业化工具对比
  • TypeScript 与前端框架React
  • 解决git报错:fatal: unable to connect to cache daemon: Unknown error