Spark中的常见算子
1、触发算子
1)count
count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:触发算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int
2) foreach算子
功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】
3)saveAsTextFile算子
功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None
4)first 算子
功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T
5)take 算子
功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大举例: [1,2,3,4,5,6,7,8,9]
假如是三个分区:
[1,2,3]
[4,5,6]
[7,8,9]
take(4) 1 2 3 4
6)collect 算子 --收集,类似于吹哨
collect算子
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。
案例:在sortBy sortByKey 中,如果不收集就打印的话,此时打印的是每一个分区的结果,为了看到全局排序的结果,此时你需要先collect 再 打印就能看到结果了。
7) reduce算子 --规约,聚集
功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U
reduceByKey(lambda tmp,item: tmp+item)# 一般用于KV键值对的数据
等同于: select word,sum(value) from a groub by word;rdd:1 2 3 4 5 6 7 8 9 10
rdd.reduce(lambda tmp,item: tmp+item) = 55
# 一般用于正常数据
等同于: select sum(1) * from a ;
# top N
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
rdd = sc.parallelize(list01)
# top 是一个触发算子,不返回rdd类型
# 为什么 有时 用foreach打印,有时用print 打印
# 对于转换算子的结果,还是rdd,对于rdd 使用foreach 1) rdd 循环打印 2) foreach 是触发算子
# 对于触发算子的结果,一般不返回rdd,而是一个正常的返回值,使用print 打印即可
print(rdd.top(3))
# takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
print(rdd.takeOrdered(3))
8)top算子:求排好序之后的最大的几个值
功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]
9)takeOrdered : 求排好序之后的最小的几个值
功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]
案例:
print(f01Rdd.takeOrdered(3))
10)collectAsMap 算子
功能:将二元组类型的RDD转换成一个Dict字典
分类:触发算子
场景:当需要将键值对形式的 RDD 数据在 Driver 端整理成一个 Map 结构时使用。
特点:会将整个 RDD 的数据拉取到 Driver 内存中,数据量必须比较小,如果数据量过大可能导致内存溢出。
语法:def collectAsMap (self) -> Dict [K, V]举例:
dict = rdd_kv.collectAsMap()
print(type(dict))
for k,v in dict.items():
print(k,v)
11)foreachPartition 算子
功能:对 RDD 的每个分区应用一个给定的函数,在每个分区内独立执行操作。将整个分区的数据加载到内存进行foreach处理,没有返回值
分类:触发算子
场景:当需要在分区级别进行批量操作,如关闭每个分区相关的数据库连接等情况。
特点:在每个分区上执行,相比 foreach 可以减少函数调用的开销。
语法:def foreachPartition (self, f: Callable [[Iterator [T]], None]) -> None
12)max 算子
功能:返回 RDD 中的最大值。如果 RDD 中的元素是自定义类型,需要自定义比较规则。
分类:触发算子
场景:获取 RDD 中的最大元素。
特点:会遍历整个 RDD,数据量过大时性能可能受影响。
语法:def max (self) -> T
13)min 算子
功能:返回 RDD 中的最小值。如果 RDD 中的元素是自定义类型,需要自定义比较规则。
分类:触发算子
场景:获取 RDD 中的最小元素。
特点:会遍历整个 RDD,数据量过大时性能可能受影响。
语法:def min (self) -> T
14)mean 算子
功能:计算 RDD 中元素的平均值。要求 RDD 中的元素是数值类型或可以转换为数值类型。
分类:触发算子
场景:需要计算 RDD 数据的平均数值时使用。
特点:需要遍历整个 RDD 进行计算。
语法:不同的实现方式有不同的语法,例如在一些统计库中可能是 def mean (self) -> float
15)sum 算子
功能:计算 RDD 中所有元素的总和。要求 RDD 中的元素是数值类型或可以转换为数值类型。
分类:触发算子
场景:需要计算 RDD 数据的总和时使用。
特点:需要遍历整个 RDD 进行计算。
语法:不同的实现方式有不同的语法,例如在一些数值计算库中可能是 def sum (self) -> float
2、转换算子
1)map算子
功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果
举例说明:
# 需求:计算每个元素的立方
# 原始数据 1 2 3 4 5 6
# 目标结果 1 8 27 64 125 216
list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))
2) flatMap算子
功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]Iterable :传递进来的数据,必须至少是Iterable ,这个类中要实现 __iter__
Iterator: 迭代器 __next__ 以及 __iter__判断一个对象是否为可迭代数据类型:
print(isinstance(map(str, [10, 20, 30]), Iterator)) # True
3)filter算子
功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进
行过滤,符合条件就保留,不符合就过滤
分类:转换算子
场景:行的过滤,类似于SQL中where或者having
def filter(self, f: T -> bool ) -> RDD[T]
4)union算子
union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]
5) distinct算子
功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]
6)分组聚合算子:groupByKey、 reduceByKey
分类:xxxByKey算子,只有KV类型的RDD才能调用
groupByKey:字面意思是:根据key值进行分组
功能:对KV类型的RDD按照Key进行分组,相同K的Value放入一 个集合列表中,返回一个新的RDD
语法:RDD【K,V】.groupByKey => RDD【K, List[V]】
最终的结果可能是:
hive [1,3]
分类:转换算子
场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce
特点:必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则
def groupByKey(self, numpartitions, partitionFunction) ->RDD[Tuple[K,Iterable[V]]]
举例:统计单词出现次数,每个单词不聚合,只展示1,比如:
spark ------> 1 1 1 1 1 1 1 1 1 1
hbase ------> 1 1 1 1 1 1
hadoop ------> 1 1 1 1 1 1 1
hive ------> 1 1 1
hue ------> 1 1 1 1 1 1 1 1 1
reduceByKey: 根据key值,进行合并计算
reduceByKey算子
功能:对KV类型的RDD按照Key进行分组,并对相同Key的所有
Value使用参数中的reduce函数进行聚合
要求:只有KV类型的RDD才能调用
分类:转换算子
特点:必须经过shuffle,可以指定新的RDD分区个数,可以指定分区规则
语法:
def reduceByKey(self,f: (T,T) ->T,numPartitions,partitionFunction) ->RDD[Tuple[K,V]]
7)排序算子:sortBy、sortByKey
sortBy算子:
功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序
sortByKey算子:
功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]
使用这个算子,还想得到以上的需求的结果,必须让年龄为key
8) 重分区算子:repartition、coalesce
repartition算子:
功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]
coalesce算子:
功能:调整RDD的分区个数
分类:转换算子特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]
9)keys算子 : 获取所有的key
功能:针对二元组KV类型的RDD,返回RDD中所有的Key,放入一个新的RDD中
分类:转换算子
语法
def keys( self: RDD[Tuple[K,V]] ) -> RDD[K]
- 示例rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
rdd_keys = rdd_kv.keys()
rdd_keys.foreach(lambda x: print(x))
10)values算子 : 获取所有rdd中的value
- - 功能:针对二元组KV类型的RDD,返回RDD中所有的Value,放入一个新的RDD中
- 分类:转换算子
- 语法
def values( self: RDD[Tuple[K,V]] ) -> RDD[V]
- 示例
rdd_values = rdd_kv.values()
rdd_values.foreach(lambda x: print(x))
11)mapValues算子:
将所有的value拿到之后进行map转换,转换后还是元组,只是元组中的value,进行了变化
- 功能:针对二元组KV类型的RDD,对RDD中每个元素的Value进行map处理,结果放入一个新的RDD中
- 分类:转换算子
- 语法 def mapValues(self: RDD[Tuple[K,V]], f: (V) -> U) -> RDD[Tuple[K,U]]
举例:
rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
rsRdd = rdd_kv.mapValues(lambda age: age + 1)
rsRdd.foreach(lambda x:print(x))
12)join方面的算子 :
join / fullOuterJoin / leftOuterJoin / rightOuterJoin
假如是两个list聚合的算子,合并union,如果是KV类型的join
实现**两个KV类型**的RDD之间按照K实现关联,将两个RDD的关联结果放入一个新的RDD中
def join(self: RDD[Tuple[K,V]], otherRdd: RDD[Tuple[K,W]]) -> RDD[Tuple[K,(V,W)]]join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作
举例说明:
import os
from pyspark import SparkContext, SparkConf
"""
------------------------------------------
Description :
SourceFile : Join
Author : dcc
Date : 2024/10/31
-------------------------------------------
"""
if __name__ == '__main__':
# 设置 任务的环境变量
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
# 获取sc 对象
conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
sc = SparkContext(conf=conf)
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
numSlices=2)
rdd_singer_music = sc.parallelize(
[("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
("动力火车", "当")], numSlices=2)
# join 是 转换算子 join 可以理解为内连接
joinRdd = rdd_singer_age.join(rdd_singer_music)
joinRdd.foreach(print)
print("*"*100)
leftRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music)
leftRdd.foreach(print)
print("*"*100)
rightRdd = rdd_singer_age.rightOuterJoin(rdd_singer_music)
rightRdd.foreach(print)
print("*"*100)
fullRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music)
fullRdd.foreach(print)
# join 关联的是两个kv类型的rdd
# union 关联的是单个元素的rdd
# 关闭sc
sc.stop()
join展示结果:
('陈奕迅', (47, '孤勇者'))
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('林子祥', (74, '男儿当自强'))
********left join 显示结果*******************************************************
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('陈升', (63, None))
('陈奕迅', (47, '孤勇者'))
('林子祥', (74, '男儿当自强'))
*********right join 显示结果************************************
('动力火车', (None, '当'))
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('林子祥', (74, '男儿当自强'))
('陈奕迅', (47, '孤勇者'))
********full join 显示结果*********************************************
('动力火车', (None, '当'))
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('陈升', (63, None))
('陈奕迅', (47, '孤勇者'))
('林子祥', (74, '男儿当自强'))
13)mapPartitions算子:
- 功能:对RDD每个分区的数据进行操作,将每个分区的数据进行map转换,将转换的结果放入新的RDD中
- 分类:转换算子
def mapPartitions(self: RDD[T], f: Iterable[T] -> Iterable[U] ) -> RDD[U]
3、哪些算子能触发shuffle过程:
1)分组聚合算子:groupByKey、 reduceByKey
2)排序算子:sortBy、sortByKey
3)重分区算子:repartition、coalesce(根据情况)
4)join方面的算子 :join / fullOuterJoin / leftOuterJoin / rightOuterJoin