当前位置: 首页 > article >正文

Sparksql 动态shuffle partition

背景

spark版本2.4.6

在spark sql中shuffle的partition数量由spark.sql.shuffle.partitions决定。

spark.sql.shuffle.partitions默认是200

shuffle并行度定死某一个数值(如200),会存在一些问题。因为每次shuffle的数据量是不确定。当数据量小的时候,每一个shuffle的task处理的数据量太少(如下图)。当数据量大的时候,每一个shuffle处理的数量又太多。所以需要根据shuffle前文件大小来动态调节shuffle的并行度。

动态shuffle partition

主要参数有三个:

  • spark.sql.adaptive.enabled:开启adaptive功能,默认false
  • spark.sql.adaptive.shuffle.targetPostShuffleInputSize:shuffle单个partition输入文件大小,默认64M
  • spark.sql.adaptive.minNumPostShufflePartitions:shuffle的partition最小数量,默认-1

开启动态shuffle,需要设置spark.sql.adaptive.enabled=true,开启后shuffle的并行度变成了有200变成了1

可以通过spark.sql.adaptive.shuffle.targetPostShuffleInputSize来调整shuffle的并行度。如下图spark.sql.adaptive.shuffle.targetPostShuffleInputSize设置为500kb,shuffle的数据大小为1034.6kb。并行度=ceil(1034.6/500)=3

也可以通过spark.sql.adaptive.minNumPostShufflePartitions调整shuffle并行度。如下图设置spark.sql.adaptive.minNumPostShufflePartitions为10,shuffle并行度为11(满足最小为10)

shuffle并行度确定流程(源码)

暂时无法在飞书文档外展示此内容

举例说明:

源码在ExchangeCoordinator类中

map输出文件大小为1034.6KB()

spark.sql.adaptive.enabled=true

spark.sql.adaptive.shuffle.targetPostShuffleInputSize=500KB

spark.sql.adaptive.minNumPostShufflePartitions=10

文件大小totalPostShuffleInputSize=1112175,除以10,得到maxPostShuffleInputSize=111218,跟500KB取最小值,targetPostShuffleInputSize最终等于111218

map输出的文件已经按照默认分区数200进行了分组。mapOutputStatistics中是map task生成文件中200个分区对应的大小。遍历200个分区,从mapOutputStatistics中获取全部map输出文件中对应partition的数据大小,累加直到targetPostShuffleInputSize生成一个合并分区。最后写入partitionStartIndices中。

partitionStartIndices中保存的是原始200分区对应的索引值,表示合并后partition的起始位置。

最后将200个分区合并成了11个分区。

分别对应处理

0-19 20-38 39-57 58-77 78-97 98-117 118-136 137-154 156-175 176-194 195-200


http://www.kler.cn/news/362785.html

相关文章:

  • RootNeighboursDataset(helpers.dataset_classes文件中的root_neighbours_dataset.py)
  • Git 文件大小写混乱?
  • 如何用mmclassification训练多标签多分类数据
  • 海南聚广众达电子商务咨询有限公司靠谱吗怎么样?
  • Android——FileProvider
  • 对接金蝶云星空存货档案到MES系统的详细步骤及javajs动态脚本拉取的实现
  • 写了一个SpringBoot的后端管理系统(仅后端)pine-manage-system
  • 【NodeJS】NodeJS+mongoDB在线版开发简单RestfulAPI (四):状态码的使用
  • 软件测试与软件缺陷的基础知识
  • Triton语言:机器学习领域的新星,能否挑战CUDA的霸主地位? ​​​​​​​
  • Zookeeper面试整理-分布式系统知识
  • Oracle OCP认证考试考点详解082系列01
  • perl统一修改文件前缀并排序
  • Embedding实现GPT回答和知识库内容相关的内容
  • LabVIEW继电器视觉检测系统
  • CSS3文本阴影、文本换行、文本溢出、文本修饰、文本描边的使用
  • 项目打包不同环境
  • 【D3.js in Action 3 精译_036】4.1 DIY 实战:在 Observable 平台实现 D3折线图坐标轴的绘制
  • AudioSegment 提高音频音量 - python 实现
  • 消息队列(仿RabbitMQ)—— 生产消费模型
  • 钉钉消息推送工具类
  • 使用皮尔逊相关系数矩阵进行特征筛选
  • Windows系统启动MongoDB报错无法连接服务器
  • 码支付源码2024又一款全新码支付源码
  • 国产自主可控新征程:华为原生鸿蒙系统与鲲鹏认证
  • vue中选项式 API(Options API) 和 组合式 API(Composition API)区别