当前位置: 首页 > article >正文

Spark中的常见算子

1、触发算子

1)count

count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:触发算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int

2) foreach算子

 功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】

 3)saveAsTextFile算子

功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None

 4)first 算子

功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T

 5)take 算子

功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大

举例: [1,2,3,4,5,6,7,8,9]
假如是三个分区:
[1,2,3]
[4,5,6]
[7,8,9]
take(4)     1 2 3 4

 6)collect 算子 --收集,类似于吹哨

collect算子
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出

理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。

案例:在sortBy  sortByKey 中,如果不收集就打印的话,此时打印的是每一个分区的结果,为了看到全局排序的结果,此时你需要先collect 再 打印就能看到结果了。

7) reduce算子 --规约,聚集 

功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U
reduceByKey(lambda tmp,item: tmp+item)

# 一般用于KV键值对的数据
等同于:  select word,sum(value)  from a groub by word;

rdd:1 2 3 4 5 6 7 8 9 10
rdd.reduce(lambda tmp,item: tmp+item) = 55
# 一般用于正常数据
等同于:  select sum(1) * from a ;

# top N
    list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    rdd = sc.parallelize(list01)
    # top 是一个触发算子,不返回rdd类型
    # 为什么 有时 用foreach打印,有时用print 打印
    # 对于转换算子的结果,还是rdd,对于rdd 使用foreach  1) rdd 循环打印  2) foreach 是触发算子
    # 对于触发算子的结果,一般不返回rdd,而是一个正常的返回值,使用print 打印即可
    print(rdd.top(3))
    # takeOrdered 也是一个触发算子,返回排序之后的最小的几个值
    print(rdd.takeOrdered(3))

 8)top算子:求排好序之后的最大的几个值

功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]

 9)takeOrdered : 求排好序之后的最小的几个值

功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]
案例:
print(f01Rdd.takeOrdered(3))

10)collectAsMap 算子

功能:将二元组类型的RDD转换成一个Dict字典
分类:触发算子
场景:当需要将键值对形式的 RDD 数据在 Driver 端整理成一个 Map 结构时使用。
特点:会将整个 RDD 的数据拉取到 Driver 内存中,数据量必须比较小,如果数据量过大可能导致内存溢出。
语法:def collectAsMap (self) -> Dict [K, V]

举例:
dict = rdd_kv.collectAsMap()
    print(type(dict))
    for k,v in dict.items():
        print(k,v)

 11)foreachPartition 算子

功能:对 RDD 的每个分区应用一个给定的函数,在每个分区内独立执行操作。将整个分区的数据加载到内存进行foreach处理,没有返回值
分类:触发算子
场景:当需要在分区级别进行批量操作,如关闭每个分区相关的数据库连接等情况。
特点:在每个分区上执行,相比 foreach 可以减少函数调用的开销。
语法:def foreachPartition (self, f: Callable [[Iterator [T]], None]) -> None

 

12)max 算子

功能:返回 RDD 中的最大值。如果 RDD 中的元素是自定义类型,需要自定义比较规则。
分类:触发算子
场景:获取 RDD 中的最大元素。
特点:会遍历整个 RDD,数据量过大时性能可能受影响。
语法:def max (self) -> T

13)min 算子

功能:返回 RDD 中的最小值。如果 RDD 中的元素是自定义类型,需要自定义比较规则。
分类:触发算子
场景:获取 RDD 中的最小元素。
特点:会遍历整个 RDD,数据量过大时性能可能受影响。
语法:def min (self) -> T

14)mean 算子

功能:计算 RDD 中元素的平均值。要求 RDD 中的元素是数值类型或可以转换为数值类型。
分类:触发算子
场景:需要计算 RDD 数据的平均数值时使用。
特点:需要遍历整个 RDD 进行计算。
语法:不同的实现方式有不同的语法,例如在一些统计库中可能是 def mean (self) -> float

15)sum 算子

功能:计算 RDD 中所有元素的总和。要求 RDD 中的元素是数值类型或可以转换为数值类型。
分类:触发算子
场景:需要计算 RDD 数据的总和时使用。
特点:需要遍历整个 RDD 进行计算。
语法:不同的实现方式有不同的语法,例如在一些数值计算库中可能是 def sum (self) -> float

 2、转换算子

1)map算子

功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果

举例说明:

# 需求:计算每个元素的立方
# 原始数据 1 2 3 4 5 6
# 目标结果 1 8 27 64 125 216

list01 = [1,2,3,4,5,6]
	listRdd = sc.parallelize(list01)
	mapRdd = listRdd.map(lambda x: math.pow(x,3))
	mapRdd.foreach(lambda x: print(x))

2) flatMap算子

功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]

Iterable :传递进来的数据,必须至少是Iterable  ,这个类中要实现 __iter__
Iterator: 迭代器      __next__ 以及 __iter__

判断一个对象是否为可迭代数据类型:
print(isinstance(map(str, [10, 20, 30]), Iterator))  # True

 3)filter算子

功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进
行过滤,符合条件就保留,不符合就过滤
分类:转换算子
场景:行的过滤,类似于SQL中where或者having
 def filter(self, f: T -> bool ) -> RDD[T]

 4)union算子

union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]

 5) distinct算子

功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]

 6)分组聚合算子:groupByKey、 reduceByKey

分类:xxxByKey算子,只有KV类型的RDD才能调用

groupByKey:字面意思是:根据key值进行分组

功能:对KV类型的RDD按照Key进行分组,相同K的Value放入一 个集合列表中,返回一个新的RDD

语法:RDD【K,V】.groupByKey => RDD【K, List[V]】

最终的结果可能是:

hive [1,3]

分类:转换算子

场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce

特点:必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则

def groupByKey(self, numpartitions, partitionFunction) ->RDD[Tuple[K,Iterable[V]]]

举例:统计单词出现次数,每个单词不聚合,只展示1,比如:

spark ------> 1 1 1 1 1 1 1 1 1 1

hbase ------> 1 1 1 1 1 1

hadoop ------> 1 1 1 1 1 1 1

hive ------> 1 1 1

hue ------> 1 1 1 1 1 1 1 1 1

reduceByKey: 根据key值,进行合并计算

reduceByKey算子
功能:对KV类型的RDD按照Key进行分组,并对相同Key的所有
Value使用参数中的reduce函数进行聚合
要求:只有KV类型的RDD才能调用
分类:转换算子
特点:必须经过shuffle,可以指定新的RDD分区个数,可以指定分区规则
语法:
def reduceByKey(self,f: (T,T) ->T,numPartitions,partitionFunction) ->RDD[Tuple[K,V]]

7)排序算子:sortBy、sortByKey 

sortBy算子:

功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序

 sortByKey算子: 

功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]
使用这个算子,还想得到以上的需求的结果,必须让年龄为key

8) 重分区算子:repartition、coalesce

repartition算子:

功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]

coalesce算子:

功能:调整RDD的分区个数
分类:转换算子

特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]

9)keys算子 : 获取所有的key 

功能:针对二元组KV类型的RDD,返回RDD中所有的Key,放入一个新的RDD中

分类:转换算子

语法
    def keys( self: RDD[Tuple[K,V]] ) -> RDD[K]
  - 示例

        rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
        rdd_keys = rdd_kv.keys()
        rdd_keys.foreach(lambda x: print(x))

10)values算子 : 获取所有rdd中的value 

- - 功能:针对二元组KV类型的RDD,返回RDD中所有的Value,放入一个新的RDD中
  - 分类:转换算子
  - 语法
    def values( self: RDD[Tuple[K,V]] ) -> RDD[V]
  - 示例
        rdd_values = rdd_kv.values()
        rdd_values.foreach(lambda x: print(x))

 11)mapValues算子:

将所有的value拿到之后进行map转换,转换后还是元组,只是元组中的value,进行了变化

- 功能:针对二元组KV类型的RDD,对RDD中每个元素的Value进行map处理,结果放入一个新的RDD中
- 分类:转换算子
- 语法 def mapValues(self: RDD[Tuple[K,V]], f: (V) -> U) -> RDD[Tuple[K,U]]
举例:
    rdd_kv = sc.parallelize([('laoda',11),('laoer',22),('laosan',33),('laosi',44)], numSlices=2)
    rsRdd = rdd_kv.mapValues(lambda age: age + 1)
    rsRdd.foreach(lambda x:print(x)) 

12)join方面的算子 :

join / fullOuterJoin / leftOuterJoin / rightOuterJoin

假如是两个list聚合的算子,合并union,如果是KV类型的join

实现**两个KV类型**的RDD之间按照K实现关联,将两个RDD的关联结果放入一个新的RDD中
def join(self: RDD[Tuple[K,V]], otherRdd: RDD[Tuple[K,W]]) -> RDD[Tuple[K,(V,W)]]

join的过程,必然引发相同key值的数据汇总在一起,引发shuffle 操作

 举例说明:

import os
from pyspark import SparkContext, SparkConf

"""
------------------------------------------
  Description : 
  SourceFile : Join
  Author  : dcc
  Date  : 2024/10/31
-------------------------------------------
"""

if __name__ == '__main__':
    # 设置 任务的环境变量
    os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取sc 对象
    conf = SparkConf().setMaster("local[2]").setAppName("其他转换算子")
    sc = SparkContext(conf=conf)
    rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)],
                                    numSlices=2)
    rdd_singer_music = sc.parallelize(
        [("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),
         ("动力火车", "当")], numSlices=2)
    # join 是 转换算子  join 可以理解为内连接
    joinRdd = rdd_singer_age.join(rdd_singer_music)
    joinRdd.foreach(print)

    print("*"*100)
    leftRdd = rdd_singer_age.leftOuterJoin(rdd_singer_music)
    leftRdd.foreach(print)
    print("*"*100)
    rightRdd = rdd_singer_age.rightOuterJoin(rdd_singer_music)
    rightRdd.foreach(print)
    print("*"*100)
    fullRdd = rdd_singer_age.fullOuterJoin(rdd_singer_music)
    fullRdd.foreach(print)
	# join 关联的是两个kv类型的rdd
	# union 关联的是单个元素的rdd
    # 关闭sc
    sc.stop()


join展示结果:
('陈奕迅', (47, '孤勇者'))
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('林子祥', (74, '男儿当自强'))
********left join 显示结果*******************************************************
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('陈升', (63, None))
('陈奕迅', (47, '孤勇者'))
('林子祥', (74, '男儿当自强'))
*********right join 显示结果************************************
('动力火车', (None, '当'))
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('林子祥', (74, '男儿当自强'))
('陈奕迅', (47, '孤勇者'))
********full join 显示结果*********************************************
('动力火车', (None, '当'))
('周杰伦', (43, '青花瓷'))
('蔡依林', (41, '日不落'))
('陈升', (63, None))
('陈奕迅', (47, '孤勇者'))
('林子祥', (74, '男儿当自强'))

 13)mapPartitions算子

- 功能:对RDD每个分区的数据进行操作,将每个分区的数据进行map转换,将转换的结果放入新的RDD中
- 分类:转换算子
def mapPartitions(self: RDD[T], f: Iterable[T] -> Iterable[U] ) -> RDD[U]

 3、哪些算子能触发shuffle过程:

  1)分组聚合算子:groupByKey、 reduceByKey

  2)排序算子:sortBy、sortByKey 

  3)重分区算子:repartition、coalesce(根据情况)

  4)join方面的算子 :join / fullOuterJoin / leftOuterJoin / rightOuterJoin


http://www.kler.cn/a/375591.html

相关文章:

  • PostgreSQL 到 PostgreSQL 数据迁移同步
  • Hadoop期末复习(完整版)
  • 如何实现LRU缓存淘汰算法?
  • 期权懂|开通ETF股票期权需要什么条件?ETF股票期权佣金是多少?
  • 代码随想录算法训练营第三十三天|Day33 动态规划
  • Java Executor ScheduledExecutorService 源码
  • ubuntu22-安装vscode-配置shell命令环境-mac安装
  • Serverless + AI 让应用开发更简单
  • 报错:npm : 无法加载文件 C:\Program Files\nodejs\npm.ps1,因为在此系统上禁止运行脚本。
  • vue3-element-admin 去掉登录
  • Docker Compose入门学习——下载、授权、创建文件、定义服务
  • 创建一个基于SSM(Spring, Spring MVC, MyBatis)的教学视频点播系统
  • Sigrity Power SI Multiple Structure Simulation模式如何进行跨板级联仿真操作指导(一)
  • npm install -g @vue/cil 非常卡慢
  • linux alsa-lib snd_pcm_open函数源码分析(一)
  • 腾讯云数据库TDSQL:数据库界的“高架桥”
  • 【论文阅读】Associative Alignment for Few-shot Image Classification
  • ESP-IDF HTTP POST请求发送音频-启明云端乐鑫代理商
  • 【机器学习】21. Transformer: 最通俗易懂讲解
  • 优化低代码开发平台用户体验:功能树导航设计探讨
  • 穷举vs暴搜vs深搜vs回溯vs剪枝 算法专题
  • #渗透测试#SRC漏洞挖掘# 操作系统-windows系统
  • C++设计模式结构型模式———桥接模式
  • watch 和 computed 的区别 - 2024最新版前端秋招面试短期突击面试题【100道】
  • MySQL锁——针对实习面试
  • 【华为HCIP实战课程二十九】中间到中间系统协议IS-IS邻居关系建立和LSP详解,网络工程师