当前位置: 首页 > article >正文

RDD 算子全面解析:从基础到进阶与面试要点

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

目录

一、RDD 算子分类

(一)Transformation 算子(转换算子)

(二)Action 算子(触发算子 / 行为算子)

(三)各个算子的作用,对比sql中的关键字

二、常用转换算子详细解析

(一)map 算子

(二)flatMap 算子

(三)filter 算子

三、常见触发算子详细解析

(一)count 算子

(二)foreach 算子

(三)saveAsTextFile 算子

四、其他转换算子

(一)union 算子

(二)distinct 算子

(三)分组聚合算子:groupByKey、reduceByKey

(四)排序算子:sortBy、sortByKey

sortBy

sortByKey

(五)重分区算子:repartition、coalesce

repartition

coalesce

五、其他触发算子

(一)first 算子

(二)take 算子

(三)collect 算子

(四)reduce 算子

(五)TopN 算子:top、takeOrdered

top 算子

takeOrdered 算子

六、算子的其他方面

(一)面试题:groupByKey + map 和 reduceByKey 的区别

区别

map 端的聚合(Combiner)

(二)其他 KV 类型算子

keys

values

mapValues

collectAsMap

(三)join 方面的算子

join / fullOuterJoin / leftOuterJoin / rightOuterJoin

(四)分区算子

为什么需要分区算子

mapPartitions

foreachParition

七、总结

(一)触发算子

(二)转换算子

(三)能触发shuffle过程的算子


        在大数据处理领域,Spark 中的 RDD(弹性分布式数据集)是核心概念之一。RDD 算子则是对 RDD 进行操作的关键工具,它们决定了数据的处理方式和流程。深入理解 RDD 算子对于高效地使用 Spark 处理大规模数据至关重要。本文将详细介绍 RDD 的常用基础算子,包括算子的分类、功能、代码示例、常见问题以及面试相关要点。

一、RDD 算子分类

        Spark为了避免资源浪费,将RDD的读取、转换设计为lazy模式 【只定义,不执行】需要等待真正使用到对应RDD的数据返回给用户时,才真正的执行所有RDD的构建和转换。

(一)Transformation 算子(转换算子)

  1. 特点
    • 处于 lazy 模式,一般不会触发 job 的运行。这意味着只有当需要使用该算子处理后的数据时,才会真正执行相关的计算。
    • 算子返回值一定是 RDD。这种设计使得可以对 RDD 进行连续的转换操作,构建复杂的数据处理管道。
  2. 常见的 Transformation 算子
  • map:对 RDD 中的每个元素进行一对一的映射操作。
  • filter:根据给定的条件过滤 RDD 中的元素。
  • flatMap:对 RDD 中的每个元素进行操作,将每个元素映射为 0 个或多个新元素,并将结果扁平化。
  • reduceByKey:针对键值对(KV)类型的 RDD,根据相同的 key 对 value 进行聚合计算。
  • groupByKey:对 KV 类型的 RDD 按照 Key 进行分组,将相同 key 的 value 放入一个集合列表中。
  • sortByKey:对 KV 类型的 RDD 按照 key 进行排序。

(二)Action 算子(触发算子 / 行为算子)

  1. 特点
    • 一定会触发 job 的运行,这是与 Transformation 算子的重要区别。当执行 Action 算子时,Spark 会开始执行之前定义的一系列 Transformation 操作。
    • 返回值一定不是 RDD。根据不同的 Action 算子,返回值类型各异,如单个元素、集合、写入文件等操作的结果。
  2. 常见的 Action 算子
  • foreach:对 RDD 中的每个元素执行给定的函数。通常用于对 RDD 中的数据进行输出或其他副作用操作。
  • first:返回 RDD 中的第一个元素。
  • count:返回 RDD 中的元素个数。
  • reduce:对 RDD 中的元素进行聚合操作,需要提供一个聚合函数。
  • saveAsTextFile:将 RDD 中的数据保存为文本文件。
  • collect:将 RDD 中的所有元素收集到驱动程序中,形成一个本地集合。但要注意,如果 RDD 数据量过大,可能会导致内存溢出。
  • take:返回 RDD 中的前 n 个元素。

(三)各个算子的作用,对比sql中的关键字

类比SQL处理数据的常见功能,记住常用算子的功能、语法、场景

过滤数据:where、having => filter

处理数据:字符串函数、日期函数 => map

展开数据:explode => flatMap合并数据:union、join => union join

去重数据:distinct => distinct

分组聚合:group by + 聚合函数 => groupByKey、 reduceByKey

排序数据:order by 、sort by => sortBy、top

二、常用转换算子详细解析

(一)map 算子

        map 算子对 RDD 中的每个元素进行一对一的映射。它接受一个函数作为参数,该函数应用于 RDD 中的每个元素。例如,假设我们有一个存储学生成绩的 RDD,每个元素是一个学生的成绩,我们可以使用 map 算子将每个成绩转换为等级(如 90 分及以上为 A,80 - 89 分为 B 等)。这种转换不会改变 RDD 的元素个数,只是对每个元素的值进行了修改。

功能特点

功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果

举例说明

需求:计算每个元素的立方
原始数据
 1 2 3 4 5 6
目标结果
 1 8 27 64 125 216

list01 = [1,2,3,4,5,6]
	listRdd = sc.parallelize(list01)
	mapRdd = listRdd.map(lambda x: math.pow(x,3))
	mapRdd.foreach(lambda x: print(x))

(二)flatMap 算子

        flatMap 算子在处理数据时,先对每个元素应用一个函数,这个函数返回一个可迭代对象,然后将所有这些可迭代对象扁平化。例如,在处理文本数据时,如果我们有一个 RDD,其中每个元素是一个段落,我们可以使用 flatMap 算子将每个段落拆分成单词,然后将所有单词组成一个新的 RDD。这对于后续的文本分析任务,如单词计数、词频统计等非常有用。

功能特点

功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]

Iterable :传递进来的数据,必须至少是Iterable  ,这个类中要实现 __iter__
Iterator: 迭代器      __next__ 以及 __iter__

判断一个对象是否为可迭代数据类型:
print(isinstance(map(str, [10, 20, 30]), Iterator))  # True

举例说明

需求:返回为一个可迭代对象

夜曲/发如雪/东风破/七里香
十年/爱情转移/你的背包
日不落/舞娘/倒带
鼓楼/成都/吉姆餐厅/无法长大
月亮之上/荷塘月色

fileRdd = sc.textFile("../datas/a.txt",2)
flatRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatRdd.foreach(lambda x: print(x))

原理:
[ 夜曲/发如雪/东风破/七里香,十年/爱情转移/你的背包 ...]
||
[ [夜曲,发如雪,东风破,七里香],[十年,爱情转移,你的背包.....]]
 ||
flatMap
 ||
[夜曲,发如雪,东风破,七里香,十年,爱情转移,你的背包]

(三)filter 算子

        filter 算子根据给定的条件筛选 RDD 中的元素。条件可以是任何返回布尔值的函数。例如,在处理电商订单数据的 RDD 时,我们可以使用 filter 算子筛选出特定地区的订单,或者筛选出金额大于某个阈值的订单。这种筛选操作可以大大减少后续处理的数据量,提高处理效率。

功能特点

功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进
行过滤,符合条件就保留,不符合就过滤
分类:转换算子
场景:行的过滤,类似于SQL中where或者having
 def filter(self, f: T -> bool ) -> RDD[T]

举例说明

需求:

1 周杰伦 0 夜曲/发如雪/东风破/七里香
2 陈奕迅 0 十年/爱情转移/你的背包
3 1 日不落/舞娘/倒带
4 赵雷 0 鼓楼/成都/吉姆餐厅/无法长大
5 凤凰传奇 -1 月亮之上/荷塘月色

fileRdd = sc.textFile("../datas/b.txt",2)
	# 这个说切割的时候有问题,数组下标越界了
	# fileRdd.foreach(lambda line: print(line.split(" ")))
	# filterRdd = fileRdd.filter(lambda line: line.split(" ")[2] != '-1' and len(line.split(" ")) == 4 )
	#
	filterRdd = fileRdd.filter(lambda line: re.split(r"\s",line)[2] != '-1' and len(re.split("\\s",line)) == 4)
	filterRdd.foreach(lambda x: print(x))

三、常见触发算子详细解析

(一)count 算子

        count 算子用于计算 RDD 中的元素数量。它在实际应用中非常有用,比如我们需要知道某个数据集的大小,或者在对数据进行抽样后,计算抽样数据的数量。在 Spark 内部,count 算子会触发一个 job 的执行,遍历整个 RDD 来计算元素个数。

功能特点

count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:触发算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int

(二)foreach 算子

        foreach 算子对 RDD 中的每个元素执行指定的操作。这个操作通常是有副作用的,比如将数据写入外部存储系统、打印数据等。需要注意的是,由于 foreach 是在集群中的每个节点上执行,对于有状态的操作(如更新全局变量)需要谨慎处理,以免出现数据不一致或其他问题。

功能特点

功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】

(三)saveAsTextFile 算子

        saveAsTextFile 算子将 RDD 中的数据保存为文本文件。它会将 RDD 中的每个元素转换为字符串形式,并写入指定的文件路径。在保存过程中,Spark 会根据数据的分区情况将数据分布存储在多个文件中,以提高写入效率。这个算子常用于将处理后的结果保存下来,供后续分析或其他应用使用。

功能特点

功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None

转换算子、触发算子代码演示
import os
from pyspark import SparkContext, SparkConf


def getSongs(line):
	list = line.split()
	return list[-1]
if __name__ == '__main__':
	# 设置 任务的环境变量
	os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取sc 对象
	conf = SparkConf().setMaster("local[2]").setAppName("分区的解释")
	sc = SparkContext(conf=conf)
	print(sc)
	# 编写各种需求代码
	# 使用parallelize 这个算子获取的rdd,假如指定了分区数,就按照数字分区,假如没有指定spark.default.parallelism
	rdd1 = sc.parallelize([1,2,3,4,5,6],numSlices=3)

	# map 算子  转换算子
	mapRdd = rdd1.map(lambda x:x*3)  #PipelinedRDD
	print(type(mapRdd))
	# mapRdd.foreach(print)

	# filter算子  转换算子
	filterRdd = mapRdd.filter(lambda x:  x>=9)
	print(type(filterRdd))  # PipelinedRDD
	print(filterRdd)
	filterRdd.foreach(print)

	# flatMap 算子  转换算子
	fileRdd = sc.textFile("../../datas/a.txt")
	#fileRdd.foreach(print)
	flatmapRdd = fileRdd.flatMap(lambda line: line.split("/"))
	filterRdd = flatmapRdd.filter(lambda song: len(song)==4)
	filterRdd.foreach(print)

	fileRdd = sc.textFile("../../datas/b.txt")
	# fileRdd.foreach(print)
	#mapRdd = fileRdd.map(lambda line:getSongs(line))
	mapRdd = fileRdd.map(getSongs)
	# mapRdd.foreach(print)
	mapRdd.foreach(lambda x:print(x))
	mapRdd.flatMap(lambda line: line.split("/")).filter(lambda song:len(song) ==4).foreach(print)

	# 常见的触发算子的用法
	# count 触发算子,返回值是一个int ,不是rdd
	print(mapRdd.count())

	# foreach 将rdd中的每一个元素,执行foreach 中的函数,没有返回值,跟map 有点像
	# foreach 没有返回值,是触发算子
	# map  有返回值,不是触发算子
	mapRdd.foreach(lambda line: print("~".join(line.split("/"))))

	# saveAsTextFile 是触发算子
	mapRdd.saveAsTextFile("../../datas/result")

	# 关闭sc
	sc.stop()

四、其他转换算子

(一)union 算子

        union 算子用于合并两个 RDD。这两个 RDD 的类型必须相同,合并后的 RDD 包含了两个原始 RDD 中的所有元素。例如,有两个分别存储不同地区用户信息的 RDD,我们可以使用 union 算子将它们合并成一个包含所有用户信息的 RDD。

功能特点

union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]

(二)distinct 算子

        distinct 算子用于去除 RDD 中的重复元素。它会对 RDD 中的所有元素进行去重操作。例如,在处理一个包含用户浏览记录的 RDD 时,可能存在用户多次浏览同一页面的情况,使用 distinct 算子可以去除这些重复的浏览记录。

功能特点

功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]

union、distinct 算子代码演示
import os
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    print(sc)
    # 编写各种需求代码
    list1 = [1, 2, 3, 4, 5, 6, 7, 8]
    list2 = [5, 6, 7, 8, 9, 10]
    rdd1 = sc.parallelize(list1,2)
    rdd2 = sc.parallelize(list2,2)
    # 将两个算子中的数据进行合并, 形成新的算子  转算算子
    rdd3 = rdd1.union(rdd2)
    # rdd3.foreach(print)

    rdd4 = rdd3.distinct()
    rdd4.foreach(print)
    # 关闭sc
    sc.stop()

(三)分组聚合算子:groupByKey、reduceByKey

groupByKey

  • 功能:对 KV 类型的 RDD 按照 Key 进行分组,将相同 K 的 Value 放入一个集合列表中。例如,在处理日志数据时,我们可以根据用户 ID 对用户的操作记录进行分组。
  • 语法:RDD[K,V].groupByKey => RDD[K, List[V]]。它可以指定新的 RDD 分区个数和分区规则。
  • 场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce

  • 特点:必须经过 Shuffle 过程。在处理大规模数据时,Shuffle 可能会带来较大的性能开销,需要合理设计分区等参数。

reduceByKey

  • 根据 key 值对 value 进行合并计算。与 groupByKey 不同,它在 Shuffle 之前会在每个分区内先进行预聚合(类似于 MapReduce 中的 Combiner),这样可以减少网络传输和后续聚合的计算量,性能通常比 groupByKey 好。因此,在可以使用 reduceByKey 的情况下,尽量不使用 groupByKey + map 的方式来实现分组聚合。

groupByKey、reduceByKey 算子代码演示
import os
from pyspark import SparkContext, SparkConf


def showMsg(name,age):
    print(name,age)
def showMsg2(a,b,c):
    print(a,b,c)
if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    print(sc)
    # 编写各种需求代码
    list1 = [1, 2, 3, 4, 5, 6, 7, 8]
    list2 = [5, 6, 7, 8, 9, 10]
    rdd1 = sc.parallelize(list1, 2)
    rdd2 = sc.parallelize(list2, 2)
    # 将两个算子中的数据进行合并, 形成新的算子  转算算子
    rdd3 = rdd1.union(rdd2)
    # rdd3.foreach(print)

    rdd4 = rdd3.distinct()
    rdd4.foreach(print)

    # groupByKey   转换算子,只对 KV键值对的RDD 起作用
    rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
    rdd6 = rdd5.groupByKey()  # ("word",List[10,5])
    rdd6.foreach(lambda x: print(x[0], *x[1]))

    list3 = [10, 20, 30]
    print(*list3)
    showMsg2(*list3)
    dict = {"name": "zhangsan", "age": 29}
    showMsg(**dict)

    # reduceByKey算子
    rdd7 = rdd5.reduceByKey(lambda total,num: total * num)
    rdd7.foreach(print)
    # 关闭sc
    sc.stop()

(四)排序算子:sortBy、sortByKey

sortBy

        可以根据指定的函数对 RDD 中的元素进行排序。例如,按照用户年龄降序排序,我们可以定义一个根据年龄提取值的函数作为排序依据。

功能特点

功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序

举例说明

需求:按照用户年龄降序排序

laoda,20,male
laoer,22,female
laoliu,28,middle
laosan,24,male
laosi,30,male
laowu,26,female

 fileRdd = sc.textFile("../../datas/c.txt")
 fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)

sortByKey

        专门用于对 KV 类型的 RDD 按照 key 进行排序。这种排序方式在处理键值对数据时非常方便,比如对单词和其出现次数的 RDD 按照单词字典序排序。

功能特点

功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]
使用这个算子,还想得到以上的需求的结果,必须让年龄为key

sortBysortByKey 算子代码演示
import os
from pyspark import SparkContext, SparkConf

"""
------------------------------------------
  Description : 
  SourceFile : _05其他转换算子
  Author  : 闫哥
  Date  : 2024/4/19
-------------------------------------------
"""

if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    print(sc)
    
    # sortBy 
    fileRdd = sc.textFile("../../datas/c.txt")
    fileRdd.sortBy(lambda line:line.split(",")[1],ascending=False).foreach(print)

    # sortByKey  对KV类型的RDD进行排序
    rdd5 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
    #rdd5.sortByKey(ascending=False).foreach(print)

    # 假如你想根据value排序,怎么办?
    rdd5.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print)

    # 关闭sc
    sc.stop()

(五)重分区算子:repartition、coalesce

        这两个算子都涉及 Shuffle 过程。repartition 底层是 coalesce(shuffle=True),repartition 可以将分区变大或变小,而 coalesce 默认情况下只能将分区变小,如果设置 shuffle=True,也可以将分区变大。在处理数据倾斜等问题时,重分区算子可以帮助重新调整数据分布,提高计算效率。

repartition

        底层是 coalesce(shuffle=True),repartition 可以将分区变大或变小

功能特点

功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]

 list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    # 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
    rdd = sc.parallelize(list01)
    print(rdd.getNumPartitions()) # 2
    # repartition 是一个转换算子,必然经历shuffle过程
    bigrdd = rdd.repartition(4)
    print(bigrdd.getNumPartitions()) # 4

coalesce

        默认情况下只能将分区变小,如果设置 shuffle=True,也可以将分区变大

功能特点

功能:调整RDD的分区个数
分类:转换算子

特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]

# 将一个小分区,变为大分区,shuffle 必须等于True,否则分区数不发生改变
    bigbigrdd = bigrdd.coalesce(8,shuffle=True) # 8
    print(bigbigrdd.getNumPartitions())
    smallRdd = bigbigrdd.repartition(2)
    print(smallRdd.getNumPartitions())  # 8 -->2
    smallRdd2 = bigbigrdd.coalesce(2)
    print(smallRdd2.getNumPartitions())  # 8 --> 2
    # repartition(num) = coalesce(num,  shuffle=True)

五、其他触发算子

(一)first 算子

        first 算子返回 RDD 中的第一个元素。它在某些情况下很有用,比如快速查看数据集的一个示例数据。需要注意的是,如果 RDD 为空,执行 first 算子会抛出异常。

功能特点

功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T

(二)take 算子

        take 算子返回 RDD 中的前 n 个元素。这对于快速获取数据集的一小部分数据进行查看或初步分析非常方便。与 collect 不同,take 不会将整个 RDD 数据收集到驱动程序中,而是只获取指定数量的元素,因此更适合处理大数据集。

功能特点

功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大

举例说明

举例: [1,2,3,4,5,6,7,8,9]
假如是三个分区:
[1,2,3]
[4,5,6]
[7,8,9]
take(4)     1 2 3 4

(三)collect 算子

        collect 算子将 RDD 中的所有元素收集到驱动程序中,形成一个本地集合。虽然它可以方便地获取和处理 RDD 中的数据,但如果 RDD 数据量过大,可能会导致内存溢出。因此,在使用 collect 算子时,需要确保数据量在驱动程序的内存承受范围内。

功能特点

功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出

理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。

案例:在sortBy  sortByKey 中,如果不收集就打印的话,此时打印的是每一个分区的结果,为了看到全局排序的结果,此时你需要先collect 再 打印就能看到结果了。

(四)reduce 算子

        reduce 算子对 RDD 中的元素进行聚合操作。它接受一个二元函数作为参数,该函数用于将两个元素合并为一个新的元素。例如,计算 RDD 中所有整数的乘积,或者将字符串 RDD 中的所有字符串连接起来等。reduce 算子从 RDD 的第一个元素开始,依次将每个元素与前一个聚合结果进行合并,直到遍历完整个 RDD。

功能特点

功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U
reduceByKey(lambda tmp,item: tmp+item)

举例说明

# 一般用于KV键值对的数据
等同于:  select word,sum(value)  from a groub by word;

rdd:1 2 3 4 5 6 7 8 9 10
rdd.reduce(lambda tmp,item: tmp+item) = 55
# 一般用于正常数据
等同于:  select sum(1) * from a ;

first、take、conllect、reduce算子代码演示
import os
from pyspark import SparkContext, SparkConf


def getSongs(line):
	list = line.split()
	return list[-1]
if __name__ == '__main__':
	# 设置 任务的环境变量
	os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取sc 对象
	conf = SparkConf().setMaster("local[2]").setAppName("分区的解释")
	sc = SparkContext(conf=conf)
	print(sc)
	# 编写各种需求代码
	# 使用parallelize 这个算子获取的rdd,假如指定了分区数,就按照数字分区,假如没有指定spark.default.parallelism
	rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],numSlices=3)
	print(rdd1.first()) # 1
	# take 数据量不能大,是将数据临时存储在driver进程中
	print(rdd1.take(4)) # [1, 2, 3, 4]
	# collect  将数据收集起来放在driver进程中
	print(rdd1.collect())

	print(rdd1.reduce(lambda total, num: total + num))

	rdd2 = sc.parallelize([("word",10), ("word",5), ("hello",100), ("hello",20), ("laoyan",1)], numSlices=3)
	# reduceBYKey 是一个 转换算子,reduce 是一个出发算子
	rdd2.reduceByKey(lambda total, num: total + num).foreach(print)


	# 关闭sc
	sc.stop()

(五)TopN 算子:top、takeOrdered

top 算子

        用于获取排好序之后的最大的几个值。例如,对于一个存储学生成绩的 RDD,我们可以使用 top 算子获取成绩最高的前几名学生的成绩。

功能特点

功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]

举例说明
    # top 
    list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    rdd = sc.parallelize(list01)
    # top 是一个触发算子,不返回rdd类型
    # 为什么 有时 用foreach打印,有时用print 打印
    # 对于转换算子的结果,还是rdd,对于rdd 使用foreach  1) rdd 循环打印  2) foreach 是触发算子
    # 对于触发算子的结果,一般不返回rdd,而是一个正常的返回值,使用print 打印即可
    print(rdd.top(3))

takeOrdered 算子

        与 top 相反,它用于获取排好序之后的最小的几个值。比如,获取订单金额最小的几个订单信息。

功能特点

功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]

举例说明
    list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    rdd = sc.parallelize(list01)

    # takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
    print(rdd.takeOrdered(3))

六、算子的其他方面

(一)面试题:groupByKey + map 和 reduceByKey 的区别

区别
  • 计算方式groupByKey 只是简单地将相同 key 的 value 分组到一起,形成一个 value 的集合,然后如果要进行聚合操作,需要再使用 map 算子。而 reduceByKey 在 Shuffle 之前会在每个分区内先进行预聚合(类似 Combiner),减少了网络传输和后续聚合的计算量。
  • 性能reduceByKey 的性能通常更好,尤其是在数据量较大且存在较多重复 key 的情况下。因为它减少了不必要的数据传输和聚合计算。

map 端的聚合(Combiner)

        Combiner 类似于运行在 map 端的 Reduce。它在 map 阶段对本地数据进行初步聚合,然后再将聚合后的结果发送到 reduce 阶段。这样可以减少在网络传输过程中的数据量,提高整体性能。例如,在单词计数的场景中,在每个 map 任务中,可以先对本地出现的相同单词进行计数,然后再将结果传输到 reduce 任务进行最终的聚合。

(二)其他 KV 类型算子

keys

        keys 算子用于获取所有的 key,返回一个只包含 KV - RDD 中 key 的新 RDD。在只需要处理键或值的情况下非常有用。例如,在统计某个数据集的键的种类或者值的分布情况时。

功能特点

功能:针对二元组KV类型的RDD,返回RDD中所有的Key,放入一个新的RDD中

分类:转换算子

语法

def keys( self: RDD[Tuple[K,V]] ) -> RDD[K]

举例说明
        rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
        rdd_keys = rdd_kv.keys()
        rdd_keys.foreach(lambda x: print(x)))

values

        values 算子则用于获取所有 rdd 中的 value,返回一个只包含值的新 RDD。在只需要处理键或值的情况下非常有用。例如,在统计某个数据集的键的种类或者值的分布情况时。

功能特点

  - 功能:针对二元组KV类型的RDD,返回RDD中所有的Value,放入一个新的RDD中
  - 分类:转换算子
  - 语法
    def values( self: RDD[Tuple[K,V]] ) -> RDD[V]

举例说明
rdd_values = rdd_kv.values()
rdd_values.foreach(lambda x: print(x))

mapValues

        mapValues 算子将所有的 value 拿到之后进行 map 转换,转换后还是元组,只是元组中的 value 发生了变化。例如,对于一个存储用户 ID 和用户年龄的 KV - RDD,我们可以使用 mapValues 算子将每个用户的年龄增加一岁。

功能特点

- 功能:针对二元组KV类型的RDD,对RDD中每个元素的Value进行map处理,结果放入一个新的RDD中
- 分类:转换算子
- 语法 def mapValues(self: RDD[Tuple[K,V]], f: (V) -> U) -> RDD[Tuple[K,U]]

举例说明
    rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
	rsRdd = rdd_kv.mapValues(lambda age: age + 1)
	rsRdd.foreach(lambda x:print(x))

collectAsMap

        collectAsMap 算子将 KV - RDD 转换为一个本地的 Map。需要注意的是,如果 RDD 中有重复的 key,只会保留最后一个 key - value 对。这个算子在需要将键值对数据在本地以 Map 形式处理时很方便,比如配置信息的读取和处理。

功能特点

- 功能:将二元组类型的RDD转换成一个Dict字典
- 分类:触发算子
- 特点:类似于collect,将RDD中元素的结果放入Driver内存中的一个字典中,数据量必须比较小
- 语法
def collectAsMap(self: RDD[Tuple[K,V]]) -> Dict[K,V]

举例说明
    dict = rdd_kv.collectAsMap()
	print(type(dict))
	for k,v in dict.items():
		print(k,v)

KV 类型算子代码演示
import os
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    print(sc)
    rdd_kv = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)], numSlices=2)

    # values 是一个转换算子
    valuesRdd = rdd_kv.values()
    valuesRdd.foreach(print)

    # keys 是一个转换算子,获取所有的key
    rdd_kv.keys().foreach(print)

    # 针对 kv类型的value 重新进行计算
    rdd_kv.mapValues(lambda x:x*2).foreach(print)

    # 以上三个都是 转换算子  collectAsMap 是 触发算子,返回值是一个字典
	# collectAsMap 将一个KV类型的rdd ,快速变为一个 python中的字典类型
    print(rdd_kv.collectAsMap()) # {'laoda': 11, 'laoer': 22, 'laosan': 33, 'laosi': 44}
    # 关闭sc
    sc.stop()

(三)join 方面的算子

join / fullOuterJoin / leftOuterJoin / rightOuterJoin

        这些算子用于对两个 KV 类型的 RDD 进行连接操作。

join

        类似于 SQL 中的内连接,只返回两个 RDD 中 key 相同的元素对。

fullOuterJoin

        返回两个 RDD 中所有元素对,对于没有匹配的 key,相应的值为 None。

leftOuterJoin

        以左边的 RDD 为基础,返回左边 RDD 中所有元素和右边 RDD 中匹配 key 的元素,右边没有匹配的 key 则值为 None。

rightOuterJoin

        与 leftOuterJoin 相反,以右边的 RDD 为基础。这些 join 算子在处理关联数据时非常有用,比如在处理订单数据和用户信息数据时,通过用户 ID 进行连接操作。

功能特点

实现**两个KV类型**的RDD之间按照K实现关联,将两个RDD的关联结果放入一个新的RDD中
def join(self: RDD[Tuple[K,V]], otherRdd: RDD[Tuple[K,W]]) -> RDD[Tuple[K,(V,W)]]

join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作

举例说明
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)], numSlices= 2)
rdd_singer_music = sc.parallelize([("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"), ("动力火车", "当")], numSlices=2)

Join算子代码演示
import os
from pyspark import SparkContext, SparkConf


if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
                                    numSlices=2)
    rdd_singer_music = sc.parallelize(
        [("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
         ("动力火车", "当")], numSlices=2)
    # join 是 转换算子  join 可以理解为内连接
    joinRdd = rdd_singer_age.join(rdd_singer_music)
    joinRdd.foreach(print)
    print("*"*100)

    leftRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music)
    leftRdd.foreach(print)
    print("*"*100)

    rightRdd = rdd_singer_age.rightOuterJoin(rdd_singer_music)
    rightRdd.foreach(print)
    print("*"*100)

    fullRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music)
    fullRdd.foreach(print)
	# join 关联的是两个kv类型的rdd
	# union 关联的是单个元素的rdd
    # 关闭sc
    sc.stop()

(四)分区算子

为什么需要分区算子

        在处理大数据集时,有些操作是针对一条数据的(如 map 和 foreach),但当需要对整个数据集进行批量处理时,可能会出现问题。例如,有一个 RDD 读文件产生,有两个分区,每个分区有 50 万条数据,需要将 RDD 的数据进行一对一的处理转换,最后将转换好的结果写入 MySQL。如果直接使用普通的 map 和 foreach 可能会导致资源利用不合理、性能低下等问题。

mapPartitions

        对每个分区的数据作为一个整体进行转换操作。它接受一个函数作为参数,该函数应用于每个分区的数据。例如,在处理数据库连接池时,可以在每个分区内获取一次数据库连接,然后对分区内的所有数据进行处理,最后释放连接,提高数据库连接的使用效率。

功能特点

- 功能:对RDD每个分区的数据进行操作,将每个分区的数据进行map转换,将转换的结果放入新的RDD中
- 分类:转换算子
def mapPartitions(self: RDD[T], f: Iterable[T] -> Iterable[U] ) -> RDD[U]

举例说明
  # 使用mapPartitions:对每个分区进行处理
    def map_partition(part):
        rs = [i * 2 for i in part]
        return rs

    # 每个分区会调用一次:将这个分区的数据放入内存,性能比map更好,优化型算子,注意更容易出现内存溢出
    map_part_rdd = input_rdd.mapPartitions(lambda part: map_partition(part))

foreachParition

        对每个分区的数据执行指定的操作,通常用于对分区数据进行输出或写入外部存储等有副作用的操作。与 foreach 类似,但在分区级别执行,可以更好地控制资源和提高效率。

功能特点

- 功能:对RDD每个分区的数据进行操作,将整个分区的数据加载到内存进行foreach处理,没有返回值
- 分类:触发算子
def foreachParition(self: RDD[T] , f: Iterable[T] -> None) -> None

举例说明
    #  使用foreachPartiion:对每个分区进行处理写入MySQL
    def save_part_to_mysql(part):
        # 构建MySQL连接
        for i in part:
            # 利用MySQL连接将结果写入MySQL
            print(i)

    # 将每个分区的数据直接写入MySQL,一个分区就构建一个连接
    map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part))

mapPartitions、foreachParition算子代码演示
import os
from pyspark import SparkContext, SparkConf


def saveToMySQL(partData):
    print("我进来了")
    for i in partData:
        print(i)
def a(partData2):
	return (i * 2 for i in partData2)

if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("分区算子")
    sc = SparkContext(conf=conf)

    # 构建RDD
    input_rdd = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), numSlices=2)

	# 触发算子  算子中的函数 没有返回值
    input_rdd.foreachPartition(lambda partition: saveToMySQL(partition))

	# mapPartitions 转换算子  算子中的函数,返回值必须有,并且返回值必须是集合(可迭代类型)
    newRdd = input_rdd.mapPartitions(lambda partition: a(partition))
    newRdd.foreach(print)

    sc.stop()

七、总结

(一)触发算子

count foreach saveAsTextFile first take

collect reduce top takeOrdered

collectAsMap foreachParition max min mean sum

(二)转换算子

map flatMap filter union distinct groupByKey sortByKey sortBy reduceByKey

repartition coalesce keys values mapValues

join fullOuterJoin leftOuterJoin rightOuterJoin

mapPartitions

(三)能触发shuffle过程的算子

groupByKey sortByKey sortBy reduceByKey repartition

coalesce(根据情况) join( fullOuterJoin / leftOuterJoin / rightOuterJoin)

        本文详细介绍了 RDD 的常用基础算子,包括 34 个算子的分类、功能、代码示例以及相关的注意事项。了解这些算子对于熟练使用 Spark 进行大数据处理至关重要。在实际应用中,根据数据处理的需求选择合适的算子可以提高处理效率、减少资源消耗。同时,掌握这些算子的原理和特点对于应对 Spark 相关的面试问题也非常有帮助。无论是数据的转换、触发计算、分组聚合、排序还是其他复杂的操作,RDD 算子都提供了丰富的功能来满足不同的场景需求。在使用过程中,需要注意算子的 lazy 执行模式、Shuffle 过程对性能的影响以及内存的合理使用等问题,以确保数据处理的高效性和稳定性。


http://www.kler.cn/a/391572.html

相关文章:

  • 深度学习中的学习率调度器(scheduler)分析并作图查看各方法差异
  • RPC实现原理,怎么跟调用本地一样
  • 前端开发:表格、列表、表单
  • rk3568 , buildroot , qt ,使用sqlite, 动态库, 静态库
  • LiveNVR监控流媒体Onvif/RTSP常见问题-二次开发接口jquery调用示例如何解决JS|axios调用接口时遇到的跨域问题
  • STM32之LWIP网络通讯设计-下(十五)
  • 最新7+非肿瘤生信,机器学习筛选关键基因+样本验证。目前机器学习已经替代WGCNA成为筛选关键基因方法。非肿瘤生信分析欢迎咨询!
  • 期权懂|期权市场的发展对于投资者有什么影响?
  • AI写作(二)NLP:开启自然语言处理的奇妙之旅(2/10)
  • Ab测试与灰度发布
  • 生成 Django 中文文档 PDF 版
  • 【ESP32】ESP-IDF开发 | 低功耗管理+RTC唤醒和按键唤醒例程
  • 开发工具 IntelliJ IDEA 使用技巧、快捷键、插件分享
  • Flink转换算子
  • CSM32RV20:RISC-V核的低功耗MCU芯片,常用在智能门锁上
  • C++中级学习笔记
  • TortoiseSVN提示服务器凭证检核错误:站点名称不符
  • windows下QT5.12.11使用MSVC编译器编译mysql驱动并使用详解
  • STM32学习笔记------GPIO介绍
  • SpringCloudAlibabaSidecar整合异构微服务
  • ES6模块、CommonJS、AMD等不同的模块化实现。
  • npm i 的时候报错: npm ERR! Error: EPERM: operation not permitted, rename
  • 已解决:spark代码中sqlContext.createDataframe空指针异常
  • 优化Mac的鼠标使用体验超简单方法
  • C++零基础趣味学信息学奥赛系列课程简介
  • 科技云报到:数字化转型,从不确定性到确定性的关键路径