当前位置: 首页 > article >正文

深入理解 Spark 中的 Shuffle

 Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

PySpark 数据处理实战:从基础操作到案例分析-CSDN博客

Spark 的容错机制:保障数据处理的稳定性与高效性-CSDN博客

Spark 共享变量:广播变量与累加器全解析-CSDN博客

Spark 核心概念与宽窄依赖的详细解析-CSDN博客

        在 Spark 的大数据处理世界里,Shuffle 是一个极为关键的概念。它在处理分布式大数据量的全局分组、全局排序以及重新分区等任务时起着核心作用。本文将深入探讨 Spark 中的 Shuffle,包括其设计理念、产生 Shuffle 的算子、不同类型的 Shuffle 及其特点、优化以及相关的钨丝计划等内容。

一、Spark 的 Shuffle 设计

        Spark Shuffle 过程又被称为宽依赖过程。与一些完全依赖内存计算的模式不同,Spark 在面临诸如全局分组、排序和重新分区等问题时,需要借助 Shuffle 过程在磁盘划分的基础上进行处理,以实现高效的数据处理与分布式计算。

二、产生 Shuffle 的算子

        在 Spark 中,有一些特定的算子会引发 Shuffle 操作。例如,涉及到数据重新分区的算子,像 repartition 等;还有基于键值对进行聚合操作的算子,如 reduceByKey 等,这些算子在执行过程中往往需要对数据进行全局的重新组织和处理,从而触发 Shuffle 过程。

只要这个算子包含以下三个功能之一:必须经过Shuffle
大数据量全局分组:reduceByKey、groupByKey
大数据量全局排序:sortBy、sortByKey
大数据量增大分区:repartition、coalesce 

还有一个:join

groupByKey sortByKey sortBy reduceByKey repartition
coalesce(根据情况)  join / fullOuterJoin / leftOuterJoin / rightOuterJoin

三、Spark 中的 Shuffle 分类

Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle直接使用堆外内存和新的内存管理模型,节省了内存空间和大量的gc,提升了性能
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台

(一)Hash Based Shuffle

  1. 特点:这种 Shuffle 方式主要侧重于分区操作,不会对数据进行排序。每个 Task 会依据 ReduceTask 的个数生成多个文件,其数量为 M * R(M 为 MapTask 数量,R 为 ReduceTask 数量)。
  2. 优点:在数据量较小的情况下,由于其设计相对简单,不需要复杂的排序逻辑,所以性能表现较为出色。
  3. 缺点:当数据量增大时,会产生大量的小文件,这不仅会占用大量的磁盘空间,还会导致磁盘 I/O 性能急剧下降,从而使得整体性能非常差。

(二)Hash Based Shuffle(优化后的 File Consolidation 机制)

        在引入 File Consolidation 机制后,例如有两个 Executor 和 4 个 ReduceTask 的场景下,以前如果是 4 个 map 和 4 个 reduce 会形成 16 个文件,而现在仅生成 8 个文件。这种优化机制通过整合 Executor 的资源,在一定程度上减少了小文件的数量,缓解了磁盘压力,提升了性能。

(三)Sort Based Shuffle(目前最新的)

1)Shuffle Write

        类似于 MapReduce 中的 Map 端 Shuffle,但 Spark 的 Shuffle Write 有 3 种方式,分别是 SortShuffleWriter、BypassMergeSortShuffleWriter 和 UnsafeShuffleWriter,Spark 2 以后会根据情况自动判断选择哪种 Shuffle Write。

1.SortShuffleWriter(普通Sort Shuffle Write机制

排序,生成一个整体基于分区和分区内部有序的文件和一个索引文件
大多数场景:数据量比较大场景  与MR的Map端Shuffle基本一致
特点:有排序,先生成多个有序小文件,再生成整体有序大文件,每个Task生成2个文件,数据文件和索引文件
Sort Shuffle Write过程与MapReduce的Map端shuffle基本一致

2.BypassMergeSortShuffleWriter

类似于优化后的Hash Based Shuffle,先为每个分区生成一个文件,最后合并为一个大文件,分区内部不排序
条件:分区数小于200,并且Map端没有聚合操作
场景:数据量小

跟第一个相比,处理的数据量小,处理的分区数小于200 ,不在内存中排序。

3.UnsafeShuffleWriter

钨丝计划方案,使用UnSafe API操作序列化数据,使用压缩指针存储元数据,溢写合并使用fastMerge提升效率

条件:Map端没有聚合操作、序列化方式需要支持重定位,Partition个数不能超过2^24-1个

4.在什么情况下使用什么ShuffleWriter 呢?

ShuffleWriter的实现方式有三种:

BypassMergeSortShuffleWriter

使用这种shuffle writer的条件是:

(1) 没有map端的聚合操作
(2) 分区数小于参数:spark.shuffle.sort.bypassMergeThreshold,默认是200

UnsafeShuffleWriter

使用这种shuffle writer的条件是:

(1) 序列化工具类支持对象的重定位
(2) 不需要在map端进行聚合操作
(3) 分区数不能大于:PackedRecordPointer.MAXIMUM_PARTITION_ID + 1

SortShuffleWriter

若以上两种shuffle writer都不能选择,则使用该shuffle writer类。
这也是相对比较常用的一种shuffle writer。

2)Shuffle Read

        类似于 MapReduce 中的 Reduce 端 Shuffle,但在 Spark 中其功能由算子来决定。例如,reduceByKey 的 Shuffle Read 只进行分组聚合,不排序;sortByKey 只排序,不分组;repartition 则既不排序也不分组。

四、钨丝计划(Tungsten)

        Tungsten 聚焦于 CPU 和 Memory 使用,旨在深度挖掘分布式硬件的潜能。由于 Spark 运行在 JVM 平台,而 JVM 的垃圾回收(GC)机制在一定程度上限制了 Spark 的性能。Tungsten 采用了 off - heap(堆外内存)技术,也就是使用 JVM 之外的内存空间,实现了自己独立的内存管理。这样就有效避免了 JVM 的 GC 引发的性能问题,同时还减少了序列化和反序列化的开销。例如,UnsafeShuffleWriter 会将数据序列化,放入缓冲区进行排序,排序结束后 Spill 到磁盘,最终合并 Spill 文件为一个大文件,并且在内存存储时使用了 Java 的 Unsafe API,这就是钨丝计划在 Shuffle 过程中的具体体现。

五、总结

        Spark 中的 Shuffle 是其分布式计算体系中的关键环节。不同类型的 Shuffle 有着各自的特点和适用场景,从早期的 Hash Based Shuffle 到如今的 Sort Based Shuffle,以及不断演进的优化机制和钨丝计划的融入,都体现了 Spark 在追求高性能分布式数据处理道路上的持续探索和创新。深入理解 Spark 的 Shuffle 机制对于优化大数据处理任务、提升系统性能具有极为重要的意义,无论是对于 Spark 开发者还是大数据从业者,都值得深入研究和掌握。


http://www.kler.cn/a/392790.html

相关文章:

  • 深度学习之卷积问题
  • MySQL系列之如何在Linux只安装客户端
  • 【OceanBase 诊断调优】—— ocp上针对OB租户CPU消耗计算逻辑
  • Flink_DataStreamAPI_输出算子Sink
  • GISBox VS ArcGIS:分别适用于大型和小型项目的两款GIS软件
  • react 中 FC 模块作用
  • 不同规模的企业需要部署哪种组网?
  • 【Goland】——Gin 框架简介与安装
  • yolo标签自动标注(使用python和yolo方法)
  • 031集——获取外轮廓(只支持线段多段线)(CAD—C#二次开发入门)
  • 海思Hi3516DV300上播放G711U音频文件
  • 【Hadoop】【hdfs】【大数据技术基础】实验三 HDFS 基础编程实验
  • 【监控】如何调出电脑的中摄像头,从摄像头获取视频流
  • STM32完全学习——点亮LED灯
  • C#发票识别、发票查验接口集成、电子发票(航空运输电子行程单)
  • 【再谈设计模式】抽象工厂模式~对象创建的统筹者
  • Python酷库之旅-第三方库Pandas(214)
  • 利用编程语言和脚本编写技术,实现自动化渗透测试和安全工具的开发
  • Llama微调测试记录
  • Go 加密算法工具方法
  • 嵌入式linux系统中RTC硬件的控制与实现
  • Go语言入门教案
  • 【vue】toRefs 和 toRef——如何在解构响应式对象时保持响应性
  • 免费,WPS Office教育考试专用版
  • 【初阶数据结构篇】插入、希尔、选择、堆排序
  • 约束(MYSQL)