当前位置: 首页 > article >正文

spark的学习-04

Spark中的共享变量:

1、广播变量:

Broadcast Variables广播变量

        sc = SparkContext(conf=conf)

        fileRdd = sc.textFile("../datas/user.tsv",2)
        city_dict = {
                1: "北京",
                2: "上海",
                3: "广州",
                4: "深圳",
                5: "苏州",
                6: "无锡",
                7: "重庆",
                8: "厦门",
                9: "大理",
                10: "成都"
        }
        # 将一个变量广播出去,广播到executor中,不是task中
        city_dict_broad = sc.broadcast(city_dict)
        def getLine(line):
                list01 = line.split(" ")
                #cityName = city_dict.get(int(list01[3]))
                # 使用广播变量的变量获取数据
                cityName = city_dict_broad.value.get(int(list01[3]))
                # print(cityName)
                return line + " " + cityName
        mapRdd = fileRdd.map(getLine)
        mapRdd.foreach(print)

        # 释放广播变量
        city_dict_broad.unpersist()
        # 使用完后,记得关闭
        sc.stop()

功能将一个变量元素进行广播到每台Worker节点的Executor中,让每个Task直接从本地读取数据,减少网络传输IO

广播变量本质是一种优化手段,不做也是可以的,做了可以提高性能

在使用两张表进行join的时候,将小表进行广播,再与大表中进行join,避免了shuffle过程,提高效率

广播变量是一个只读变量,只能取不能更改

2、累加器

来一个需求:对搜狗日志的数据进行处理,统计10点搜索的数据一共有多少条

书写代码如下:

mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
 .filter(lambda line:len(re.split("\s+",line)) == 6) \
 .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

# 统计一天每小时点击量并按照点击量降序排序
_sum = 0
def getNum(tup):
    global _sum
    if tup[0][0:2] == "10":
        print("进去了")
        _sum += 1
mapRdd.foreach(getNum)
print(f"10点钟的搜索条数是{_sum}")  # 0

为什么明明有符合条件的值,得到的结果却是0呢?

原因:

因为我们在这里使用的是python的方法,而不是spark中的,spark的分布式的

而且每个Task到Driver中取了sum的副本,进行累加,但是这个累加的结果并没有返回到Driver中

global: 将变量设置为全局变量

补充: global 设置为全局变量 假如有一个变量a a = 0 Def xxx(): a+=1 这时 a+=1这里识别不到a 就需要使用global 将a设为全局变量,之后就可以接着进行操作了

Job:

job是Spark触发计算的最小的单元

由Driver解析代码,遇到了触发算子,就会构建job,一个代码可以有多个触发算子,一个Spark程序可以包含多个job

Stage:

Stage是Spark转换Task的最小单元

设计:每个Stage内部都是由Task直接在内存中转换完成,不同 Stage之间需要经过磁盘来完成

Task:

Task是Spark执行计算的最小单元

TaskSet集合中Task的个数由这个Stage中最后一个RDD的分区数 或者 最小的RDD分区数来决定

Spark的依赖关系:

窄依赖:一对一 不经过shuflle

定义:父RDD的一个分区的数据只给了子RDD的一个分区 【不用经过Shuffle】

特点:一对一或者多对一不经过Shuffle,性能相对较快, 但无法实现全局分区、排序、分组等

一个Stage内部的计算都是窄依赖的过程,全部在内存中 完成。

宽依赖: 一对多 经过shuffle

定义:父RDD的一个分区的数据给了子RDD的多个分区【需要调用Shuffle的分区器来实现】

特点:一对多,必须经过Shuffle,性能相对较慢,可以实现全 局分区、排序、分组等

Spark的job中按照宽依赖来划分Stage

本质:只是一种标记,标记两个RDD之间的依赖关系

面试题:为什么要表明宽窄依赖

1) 可以提高数据容错的性能,避免数据丢失的时候,还需要再次重建整个RDD,在窄依赖中 直接通过父子RDD的对应关系回复即可 
2) 提高数据转换的性能,将连续窄依赖操作使用同一个Task都放在内存中直接转换

场景:如果子RDD的某个分区的数据丢失

不标记:不清楚父RDD与子RDD数据之间的关系,必须重新构建整个父RDD所有数据

标记了:父RDD一个分区只对应子RDD的一个分区,按照对应关系恢复父RDD的对应分区即可

spark的shuffle过程:

1、Hash based shuffle

特点:

没有排序,只分区,数据量小的时候性能高,数据量大时,性能非常差

2、优化后的Hash based shuffle

还是不排序,只分区,但是进步在于,如果是以前 4 个 map 4 个 reduce 形成 16 个文件,现在引入 executor 以后,生成 8 个文件。

3、Sort Based Shuffle [目前最新的]

分为shuffle write 和 shuffle read

shuffle write:类似于MR中的Map端,但是Spark的 Shuffle Write有3种,会根据情况自动判断选择哪种Shuffle Write

Shuffle read:类似于MR中的Reduce端,功能完全由算子决定

Spark 2以后的Shuffle Write判断机制:

第一种:SortShuffleWriter:普通Sort Shuffle Write机制

排序,生成一个整体基于分区和分区内部有序的文件和一个索引文件

大多数场景:数据量比较大场景 与MR的Map端Shuffle基本一致

特点:有排序,先生成多个有序小文件,再生成整体有序大文件,每个Task生成2个文件,数据文件和索引文件

Sort Shuffle Write过程与MapReduce的Map端shuffle基本一致

第二种:BypassMergeSortShuffleWriter

类似于优化后的Hash Based Shuffle,先为每个分区生成一个文件,最后合并为一个大文件,分区内部不排序

条件:分区数小于200,并且Map端没有聚合操作

场景:数据量小

跟第一个相比,处理的数据量小,处理的分区数小于200 ,不在内存中排序。

第三种:UnsafeShuffleWriter

钨丝计划方案,使用UnSafe API操作序列化数据,使用压缩指针存储元数据,溢写合并使用fastMerge提升效率

条件:Map端没有聚合操作、序列化方式需要支持重定位,Partition个数不能超过2^24-1个


http://www.kler.cn/a/390530.html

相关文章:

  • 使用 Visual Studio Installer 彻底卸载 Visual Studio方法与下载
  • 《情商》提升:增强自我意识,学会与情绪共处
  • android dvr黑屏
  • vue el-date-picker 日期选择器禁用失效问题
  • 客户案例 | 如何利用Ansys工具提供互联系统(以及系统的系统),从而使“软件定义汽车”成为可能
  • Vue常用加密方式
  • 人工智能在智能家居中的应用
  • 【分布式事务】二、NET8分布式事务实践: DotNetCore.CAP 框架 、 消息队列(RabbitMQ)、 多类型数据库(MySql、MongoDB)
  • cmake同名无法创建(已解决,未深入探究)
  • Spring MVC 面试常问问题
  • 第三百二十一节 Java线程教程 - Java线程状态、Java原子变量
  • 2024.11最新Hexo+GitHub搭建个人博客
  • 网络安全渗透测试的相关理论和工具
  • dhcp和ftp
  • 设计模式之模版方法模式(Template)
  • 在CentOS下安装RabbitMQ
  • Nginx、Gateway的区别
  • TCP 三次握手意义及为什么是三次握手
  • 基于Testng + Playwright的H5自动化巡检工具
  • 【含文档】基于Springboot+Vue的生鲜团购系统 (含源码数据库+lw)
  • Arrays.sort与Collections.sort:深入解析Java中的排序算法
  • PySpark 数据处理实战:从基础操作到案例分析
  • 开源 - Ideal库 -获取特殊时间扩展方法(三)
  • MySQL 中单列索引与联合索引分析
  • SCI论文为何有“Online版”和“正式出版”?这两者有什么区别?
  • 字符函数和字符串函数(函数的模拟实现请前往gitte获取源代码)(文章结尾有链接)