当前位置: 首页 > article >正文

Spark RDD

概念

RDD是一种抽象,是Spark对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体

RDD 与 数组对比

对比项数组RDD
概念类型数据结构实体数据模型抽象
数据跨度单机进程内跨进程、跨计算节点
数据构成数组元素数据分片(Partitions)
数据定位数组下标、索引数据分片索引

RDD 4大属性

partitions 数据分片: 数据属性

partitioner 分片切割规则: 定义了把原始数据集切割成数据分片的切割规则

dependencies RDD依赖: 每个RDD都会通过dependencies属性来记录它所依赖的前一个、或是多个RDD,简称“父RDD”

compute 转换函数: 与此同时,RDD使用compute属性,来记录从父RDD到当前RDD的转换操作

例子

在这里插入图片描述

不同的食材形态,如带泥土豆、土豆片、即食薯片等等,对应的就是RDD概念
同一种食材形态在不同流水线上的具体实物,就是 RDD 的 partitions 属性
食材按照什么规则被分配到哪条流水线,对应的就是 RDD 的 partitioner 属性
每一种食材形态都会依赖上一种形态,这种依赖关系对应的是 RDD 中的 dependencies 属性
不同环节的加工方法对应 RDD的 compute 属性

RDD 编程模型

在RDD的编程模型中,一共有两种算子,Transformations 类算子和Actions类算子

开发者需要使用Transformations 类算子,定义并描述数据形态的转换过程,然后调用Actions类算子,将计算结果收集起来、或是物化到磁盘

延迟计算

RDD编程模型下,Spark在运行时的计算被划分为两个环节

  • 基于不同数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph)
  • 通过Actions类算子,以回溯的方式去触发执行这个计算流图

换句话说,开发者调用的各类Transformations算子,并不立即执行计算

当且仅当开发者调用Actions算子时,之前调用的转换算子才会付诸执行

常用算子

算子类型适用范围算子用途算子集合
Transformations任意RDDRDD内数据转换map、mapPartitons、mapPartitonsWithIndex、flatMap、 filter
Paired RDDRDD内数据耦合groupByKey、sortByKey、reduceByKey、aggregateByKey
任意RDDRDD间数据整合union、intersection、join、cogroup、cartesian
任意RDD数据整理sample、distinct
Actions任意RDD数据收集collect、first、take、takeSample、takeOrdered、count
任意RDD数据持久化saveAsTextFile、saveAsSequenceFile、saveAsObjectFile
任意RDD数据遍历foreach

map: 元素为粒度对RDD做数据转换

val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result: RDD[Int] = rdd.map(x => x + 1)
result.collect() // 返回 Array(2, 3, 4, 5, 6)

在这个例子中,我们使用 parallelize 方法创建一个包含整数的 RDD
接着,我们使用 map 算子将 RDD 中的每个整数都加上 1,生成一个新的 RDD
最后,我们使用 collect 方法将新的 RDD 中的元素取回到驱动程序中

mapPartitons: 以数据分区为粒度,使用映射函数f对RDD进行数据转换

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)
val result = rdd.mapPartitions(partition => {
  val sum = partition.sum
  Iterator(sum)
}).collect()

这个例子中,我们首先创建了一个包含10个元素的RDD,并将其分成3个分区
然后,我们使用mapPartitions算子,以数据分区为粒度进行转换
在这个例子中,我们使用partition.sum计算每个数据分区的和,并将其放入一个新的迭代器中
最后,我们使用collect算子将结果收集到本地
这个例子展示了如何使用mapPartitions算子以数据分区为单位进行操作,从而提高执行效率

flatMap:从元素到集合、再从集合到元素

val sentences: RDD[String] = sc.parallelize(List("Hello world", "How are you", "I am fine"))
val words: RDD[String] = sentences.flatMap(sentence => sentence.split(" "))

在上面的代码中,我们首先创建了一个包含多个句子的 RDD,然后使用 flatMap 方法,对每个句子进行拆分操作
具体地,对于 RDD 中的每个元素(即句子),我们都将其分割成单词,然后使用 yield 关键字将每个单词作为一个新的元素返回
最终,我们得到了一个包含所有单词的 RDD

filter:过滤RDD

val numbersRDD = sc.parallelize(Seq(-2, 0, 5, -10, 7, -3, 9))
val positiveNumbersRDD = numbersRDD.filter(x => x >= 0)
positiveNumbersRDD.foreach(println) // RDD[0, 5, 7, 9]

我们创建了一个包含数字的RDD,然后使用filter算子过滤掉其中的负数,最终返回一个新的RDD,只包含正数

mapPartitionsWithIndex: 每个元素映射为一个包含索引和单词的元组

val data = List("apple", "banana", "orange", "grape", "pear")
val rdd = sc.parallelize(data, 2)

val result = rdd.mapPartitionsWithIndex { (index, partition) =>
  partition.map(word => (index, word))
}

result.foreach(println)

这段代码创建了一个包含 5 个元素的列表,并将其转换为一个包含 2 个分区的 RDD
接着,使用 mapPartitionsWithIndex 函数将每个元素映射为一个包含索引和单词的元组,最后打印出结果

在这个例子中,mapPartitionsWithIndex 函数的输入函数接受两个参数:分区索引和分区中的元素迭代器
元素迭代器包含了分区中的所有元素,因此我们可以在其中使用 map 函数对所有元素进行操作
最终的输出结果是一个包含索引和单词的元组的 RDD

groupByKey:分组收集

groupByKey的字面意思是“按照Key做分组”,但实际上,groupByKey算子包含两步,即分组和收集


http://www.kler.cn/a/372535.html

相关文章:

  • 为什么你的 Qt 应用程序会出现 xcb 插件错误
  • 1月21日星期二今日早报简报微语报早读
  • JDBC实验测试
  • 【深度学习】利用Java DL4J 训练金融投资组合模型
  • Linux(centos)安装 MySQL 8 数据库(图文详细教程)
  • leetcode——找到字符串中所有字母异位词(java)
  • C# CSV工具类,读取csv文件、将数据导出为csv文件格式,用DataGridView表格控件显示
  • 批量删除redis数据
  • N9300-S16语音芯片:提升电梯播报体验,实现导航声音播报提示
  • Spring Boot 创建项目详细介绍
  • list ------ 是一个带头双向循环的列表
  • 从0到1,解读安卓ASO优化!
  • At dp综合
  • react基础之reactHooks
  • JetPack Compose安卓开发之底部导航Tabbar
  • Windows on ARM上使用sherpa-onnx实现语音识别
  • MFC界面开发组件Xtreme Toolkit Pro v24全新发布—完整的SVG支持
  • FPGA 第一讲
  • 快速生成高质量提示词,Image to Prompt 更高效
  • 简道云和企业微信数据同步集成案例
  • Python 操作 读/写 Excel
  • 电科金仓(人大金仓)更新授权文件(致命错误: XX000: License file expired.)
  • 鸿蒙系统开发入门:一步步踏上创新之旅
  • conda使用指南
  • 100种算法【Python版】第24篇——Bellman-Ford算法
  • Java异常2