当前位置: 首页 > article >正文

spark读取parquet文件

源码

parquet文件读取的入口是FileSourceScanExec,用parquet文件生成对应的RDD

非bucket文件所以走createNonBucketedReadRDD方法。

createNonBucketedReadRDD

过程:

  1. 确定文件分割参数
    1. openCostInBytes=4M 相关参数spark.sql.files.openCostInBytes=4M
    2. maxSplitBytes<=128M 相关参数spark.sql.files.maxPartitionBytes=128M,根据maxSplitBytes计算得来
    3. logInfo打印的日志可以用于排查参数
  2. 切分文件
    1. splitFiles进行文件切分,按照maxSplitBytes将大文件切分
  3. 切分后文件根据大小进行倒排,为了方便后面合并
  4. 合并partition
    1. getFilePartitions 将小文件合并到一个partition
  5. 生成RDD

maxSplitBytes

  • defaultMaxSplitBytes 最大分区大小=spark.sql.files.maxPartitionBytes=128M
  • openCostInBytes 打开文件的代价 默认4M
  • defaultParallelism 并行度conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) 默认是core的总和,最小为2
  • totalBytes 文件总大小(单个文件大小需要加上openCostInBytes)
  • bytesPerCore 单个core分配的文件大小

最后Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

splitFiles

0L until file.getLen by maxSplitBytesmaxSplitBytes进行文件拆分

getFilePartitions

currentSize += file.length + openCostInBytes计算文件大小的时候需要加上openCostInBytes

计算示例

parquet文件是9,905,218b,并行度是2

defaultMaxSplitBytes = 128MB

openCostInBytes = 4MB

defaultParallelism = max(2, 2) = 2

totalBytes = 9,905,218b+ 1 * 4MB = 14,099,522B

bytesPerCore = 14,099,522B / 2 = 7,049,761B

maxSplitBytes = 7,049,761B = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

文件分成0-7049761 和 7049761-9905218两部分

从下面日志可以知道计算正确。

参考https://developer.aliyun.com/article/985412?utm_content=m_1000349867


http://www.kler.cn/news/366674.html

相关文章:

  • 影刀RPA实战:常见实用功能指令
  • 低代码平台如何通过AI赋能,实现更智能的业务自动化?
  • Springboot 使用EasyExcel导出Excel文件
  • 传奇996_5——使用补丁制作武器
  • 【C++进阶篇】——STL的简介
  • PVE系统无损挂载ntfs格式物理磁盘
  • 常见的音视频格式介绍
  • Cilium + ebpf 系列文章- (七)Cilium-LoadBalancer类型的SVC的IPPool
  • 总裁主题CeoMax-Pro主题7.6开心版
  • 在linux系统中查看具体文件大小命令
  • 【C++习题】12.滑动窗口_将 x 减到 0 的最小操作数
  • 通过Docker Compose构建自己的Java项目
  • 【升华】另一个神经网络学习框架pytorch
  • 智能台灯设计(一)原理图设计
  • 大话网络协议:从OSI七层模型说开去
  • Git的原理和使用(六)
  • android 生成json 文件
  • row_number() over (partition by 分组列 order by 排序列 desc)、row_number() 函数、分组排序函数
  • 计算机网络(十二) —— 高级IO
  • 12_Linux进程管理命令详解
  • python如何通过json以及pickle读写保存数据
  • gin入门教程(9):路由分组与路由版本控制
  • MySQL 存储结构
  • 基于信号分解和多种深度学习结合的上证指数预测模型
  • 基于Multisim的音频放大电路设计与仿真
  • 软体机器人纤维:材料选择有讲究,热拉伸工艺来制造,多种功能应用