性能优化案例:通过增加 shuffle 分区的数量来解决 PySpark 中的数据倾斜
在 PySpark 应用程序中,在随机操作期间,您可能会遇到由于数据倾斜而导致的性能下降,其中分区之间的数据分布不均匀可能会导致某些节点的处理时间比其他节点长得多。这可能会显著影响作业的执行时间,尤其是当数据集很大或某些键的数据明显多于其他键时。
增加随机分区的数量可以通过减少 Spark 作业中的数据倾斜来显著提高性能。关键是根据工作负载的数据大小和特征为 ‘spark.sql.shuffle.partitions’ 设置找到合适的平衡。通过 Spark Web UI 进行监控并调整其他配置(如 AQE 或广播联接)还可以进一步优化性能并确保您的作业高效运行。
在这种情况下,我们将通过调整 spark.sql.shuffle.partitions’ 参数来增加分区数量,从而解决数据倾斜的问题,这有助于将数据在各个节点之间更均匀地分布,减少倾斜的机会。
第 1 步:了解默认的 shuffle 分区
默认情况下,在 PySpark 中,“spark.sql.shuffle.partitions”的值通常设置为“200”。此设置控制在对数据进行随机排序以进行联接或聚合时要使用的分区数。
例如:
‘’'python
spark.conf.get(“spark.sql.shuffle.partitions”) # 默认值为 200
但是,对于大型数据集,尤其是那些具有数据倾斜或大型聚合的数据集,默认的 shuffle 分区数可能不够。这会导致数据分布不均匀,导致某些分区比其他分区大得多,从而导致 **执行时间偏斜**。
### 第 2 步:识别数据倾斜
要确认数据倾斜是问题所在,您可以使用 **Spark Web UI** 检查 **阶段详细信息** 和 **任务执行时间**。如果您注意到某些任务花费的时间比其他任务长得多,则表明可能存在数据倾斜。
#### 数据倾斜的常见迹象:
- 特定阶段的任务比其他阶段花费的时间长得多。
- 某些分区比其他分区大得多。
- 任务执行时间的差异非常大。
在任务执行期间,您可以通过导航到“http://<driver-node>:4040”来访问 Spark Web UI。查找:
- **阶段**:检查某些任务比其他任务花费的时间长得多的阶段。
- **任务时间**:查看 **任务时间线** 中的任务时间,以查找倾斜迹象。
### 第 3 步:通过增加 shuffle 分区来解决数据倾斜问题
要解决数据倾斜问题,您可以增加 shuffle 分区的数量。这有助于在可用节点之间更均匀地分配数据,确保每个任务都有可管理的工作量。增加的分区允许 Spark 将数据随机排列到更精细的分区中,从而减少任何单个分区的负载。
您可以动态调整 spark.sql.shuffle.partitions 参数,如下所示:
'''python
# 为 shuffle 分区设置更高的值以减轻 skew
spark.conf.set(“spark.sql.shuffle.partitions”, “1000”) # 示例:设置为 1000 个分区
可以根据数据集大小和集群配置选择数字 ‘1000’。您可能需要尝试不同的值来确定工作负载的最佳设置。
为什么这会有帮助:
- 更多分区 = 更多并行度:通过增加随机分区的数量,Spark 会将随机操作分配到更多分区,从而实现更高的并行度并在节点之间平衡工作负载。
- 减少任务时间差异:更多的分区有助于避免某些分区比其他分区保存的数据量大得多的情况,从而减少任务时间差异并加快整体执行速度。
第 4 步:验证解决方案
进行此更改后,请重新运行作业并再次检查 Spark Web UI,以查看偏差是否已缓解:
- 检查任务是否在集群中分布得更均匀。
- 寻找任务执行时间的改进,减少异常值。
- 检查 Storage 和 Shuffle 部分,以验证数据是否在分区之间更均匀地分布。
如果问题仍然存在,您可能需要调整其他因素,例如:
- 优化Join:如果要联接大型数据集,请尝试使用 ‘broadcast()’ 广播较小的数据集。
- 加Salt键:对于某些类型的联接或聚合,加盐键有助于更均匀地分配数据。
第 5 步:调整其他相关参数
除了增加 shuffle 分区之外,其他配置还可以进一步帮助优化 Spark 性能并降低数据倾斜的可能性:
- ’spark.sql.autoBroadcastJoinThreshold’:此参数控制广播较小数据集的阈值。如果数据集小于阈值,Spark 会将其广播到所有 worker 节点,这有时可以缓解倾斜。
‘’'python
spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, “10485760”) # 10MB
- **'spark.sql.adaptive.enabled'**:自适应查询执行 (AQE) 可以在运行时动态调整随机分区大小。启用 AQE 以自动优化 shuffle 分区。
'''python
spark.conf.set(“spark.sql.adaptive.enabled”, “true”)
第 6 步:基准测试和监控
应用这些优化后,您应该通过比较更改前后的 执行时间 和 资源使用情况 来对性能进行基准测试。
- 任务执行时间:检查 Spark 任务的总运行时间,看看增加的分区数量是否缩短了总体执行时间。
- 任务持续时间和分布:使用 Spark UI 监控任务持续时间和分区大小分布,以确保没有单个分区过载。
- 集群资源利用率:检查 CPU、内存和磁盘使用情况,以确保资源得到有效利用。