当前位置: 首页 > article >正文

从一到无穷大 #35 Velox Parquet Reader 能力边界

在这里插入图片描述本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。

文章目录

  • 引言
  • 源码分析
  • 功能描述
  • 功能展望

引言

InfluxDB IOX这样完全不使用索引,只是基于执行引擎与Arrow-rs Parquet Reader的极致工程化道路无疑是相对极端的,这样的做法在大多数低时间线场景性能基本不可能高于VictoriaMetricsInfluxdb v1.x这样的全倒排索引实现,尤其是在筛选条件较多时。

权衡之下,像GreptimeDB这样对Parquet构建稀疏倒排索引[7]的方案就成了非多引擎下的合理选择,依托于DatafusionArrow-rs活跃的社区和完备的功能,事实上可以相对低成本,低风险的在各方面指标达到良好的表现,在初创公司的角度来看这当然是一条合理,有效,且高效的道路。

但是世界并不是只由rust构成,也不是所有团队都有资源愿意像InfluxData一样对DataFusionArrow-rs这样基础库做大量的投入,并稳操社区的控制权,最后才反哺自己的产品,当然回馈了不少上层软件产品,同时孕育了一众开源的时序数据库产品。

对大多数团队来说如何在现有资源下低风险,高人效的拿成果就成了需要思考的问题。

在21世纪20年代来看,自研数据库计算引擎是一件极高投入,较低回报的事情,算子扩展,并行化,性能提升,稳定性等无不需要大量的精力投入,到最后性能,功能也不及世界顶尖的执行引擎产品,这事实上是基本可预料的,一个数据库产品团队的内核研发能有多少人力去做专用计算引擎呢?这也是

这条路Meta已经走过了[1],其设计了Velox用来替换PrestoSparkXStream等系统的执行引擎,基础语言为CPP。

我们的系统语言为Cpp,在经过技术调研后除去从DuckDBClickhouse等知名项目中抠执行引擎外,可行的技术选择只剩下了Arrow AceroVeloxArrow Acero虽然依托于Arrow-cpp社区,且愿景宏大,但是整体还处于实验阶段,且没有值得信赖的项目背书。相对之下Velox确实就成了唯一的选择。

Velox研究了一段时间后,认为Velox满足了90%以上的功能需求,但是部分性能关键点存在缺失,有比较大的修改空间,本篇文章聚集在Velox Parquet Reader,探究其功能缺失点。

源码分析

VeloxVelox Parquet Reader并不是原生的Arrow-cpp的实现,而是100%自主实现的,官方给出的解释[5]是:

Velox implements a visitor pattern suitable for most columnar file formats. This visitor pattern implements file-format agnostic features like predicate pruning, filter evaluation, etc… There are also some IO optimizations like prefetching and I/O coalescing.
The format-specific implementation (for Parquet, ORC, and DWRF) involves extending these visitor APIs. Wrapping these implementations around apache arrow parquet would involve a lot more work.
Velox implements a visitor pattern suitable for most columnar file formats. This visitor pattern implements file-format agnostic features like predicate pruning, filter evaluation, etc… There are also some IO optimizations like prefetching and I/O coalescing.

Velox代码中Parquet Reader(velox/dwio/parquet/reader)的部分只涉及了文件格式的编解码,连Decoder部分也是公有的,而类似filter pushdown这样的功能则是通过Decoder,放在visitor pattern中去实现。

这样来看,确实实现更多的优化功能会十分麻烦,因为这部分代码需要新增在common部分,且需要新增大量新的公有接口。

先给出一个我自己写的Filter pushdown的读取Parquet文件样例,example中对这部分描述比较少,而且都是很底层的接口,不研究下代码还是比较难写出来的,我认为对初学者有比较大的学习价值。

void processParquetFile(const std::string& filePath, std::vector<RowVectorPtr>& rowBatches, memory::MemoryPool* pool) {
    dwio::common::ReaderOptions readerOpts{pool};
    readerOpts.setFileFormat(FileFormat::PARQUET);
    auto reader = getReaderFactory(FileFormat::PARQUET)
                      ->createReader(
                          std::make_unique<BufferedInput>(
                              std::make_shared<LocalReadFile>(filePath),
                              readerOpts.memoryPool()),
                          readerOpts);
    if (!reader) {
        std::cerr << "Failed to create reader for file: " << filePath << std::endl;
        return;
    }

    RowReaderOptions rowReaderOptions;
    auto rowType = ROW({"measurement", "timestamp", "arch", "datacenter", "usage_user"},
                       {VARCHAR(), TIMESTAMP(), VARCHAR(), VARCHAR(), DOUBLE()});
    rowReaderOptions.select(
      std::make_shared<facebook::velox::dwio::common::ColumnSelector>(
        rowType, rowType->names(), nullptr, false));
    auto scanSpec = std::make_shared<facebook::velox::common::ScanSpec>("");
  
    auto untyped = parse::parseExpr("usage_steal >= 3.0", parse::ParseOptions());
    auto filterExpr = core::Expressions::inferTypes(untyped, rowType, pool);
    std::shared_ptr<core::QueryCtx> queryCtx{core::QueryCtx::create()};
    exec::SimpleExpressionEvaluator evaluator{queryCtx.get(), pool};
    auto [subfield, filter] = exec::toSubfieldFilter(filterExpr, &evaluator);
    auto fieldSpec = scanSpec->getOrCreateChild(subfield);
    fieldSpec->addFilter(*filter);

    scanSpec->addAllChildFields(*rowType);
    rowReaderOptions.setScanSpec(scanSpec);
    auto rowReader = reader->createRowReader(rowReaderOptions);
    std::cout << "The type of rowReader is: " << typeid(*rowReader).name() << std::endl;

    auto rowBatch = BaseVector::create(rowType, 50000, pool);

    while (rowReader->next(50000, rowBatch)) {
        auto rowVector = std::dynamic_pointer_cast<RowVector>(rowBatch);
        if (rowVector) {
            std::lock_guard<std::mutex> lock(batchMutex);
            rowBatches.push_back(rowVector);
        } else {
            std::cerr << "Error: Batch is not a RowVector for file: " << filePath << std::endl;
        }
    }
}

以一个Filter pushdown的流程来入门,整体代码流程如下:

velox/dwio/parquet/reader/ParquetColumnReader.cpp:build
velox/dwio/parquet/reader/StructColumnReader.h:StructColumnReader 在构造函数中构造每一列的column reader
velox/dwio/common/SelectiveStructColumnReader.cpp:next
velox/dwio/common/SelectiveStructColumnReader.cpp:read 从此处处获取每一列的数据
velox/dwio/common/SelectiveStructColumnReader.cpp:getValues 分别调用child的getValues
velox/dwio/parquet/reader/FloatingPointColumnReader.h:read 
    velox/dwio/common/SelectiveFloatingPointColumnReader.h:readCommon
    velox/dwio/common/SelectiveFloatingPointColumnReader.h:processFilter
    velox/dwio/common/SelectiveFloatingPointColumnReader.h:readHelper
    回调 velox/dwio/parquet/reader/FloatingPointColumnReader.h:readWithVisitor filter作为参数构造ColumnVisitor
    velox/dwio/parquet/reader/ParquetData.h:readWithVisitor(Visitor visitor)  visitor中包含filter
    velox/dwio/parquet/reader/PageReader.h:readWithVisitor
    velox/dwio/parquet/reader/PageReader.h:callDecoder
        velox/dwio/parquet/reader/PageReader.cpp:seekToPage 
        velox/dwio/parquet/reader/PageReader.cpp:prepareDataPageV2
        velox/dwio/parquet/reader/PageReader.cpp:makeDecoder 构造decode
    velox/dwio/dwrf/common/ByteRLE.h:readWithVisitor 各种code被赋值给decode
    velox/dwio/common/ColumnVisitors.h:process 最重要的逻辑,判断是否应该过滤列
    velox/type/Filter.h:applyFilter
    velox/dwio/common/ColumnVisitors.h:filterPassed
    velox/dwio/common/ColumnVisitors.h:addResult 然后把下标加入addOutputRow
    velox/dwio/common/ColumnVisitors.h:addOutputRow 
    回调 velox/dwio/common/SelectiveColumnReader.h:addOutputRow 把row加入outputRows_

功能描述

Velox Parquet Reader基本能力可以表述如下:

功能Rust-rsVelox说明
RowGroup Parallel Readingvelox 支持ParquetRowReader查询指定offset limit的数据;基于此机制可以实现RowGroup并行读取,但是需要在TableScan中额外封装,较为复杂,没有底层机制自动实现RowGroup并行读取
Page Indexoffset index + column index
Split Block Bloom filters (SBBF)[11][12] 利用现代 SIMD 指令将速度提高 30%-450% 的布隆过滤器变体
Streaming decode允许一次解码一批行后push到上层Operator执行,做到解码和执行形成一个pipeline,这也是Velox的Push执行模型
I/O pushdown避免缓冲整个文件,首先获取和解码元数据,然后对相关数据块进行范围提取,并与 Parquet 数据的解码交错,并积极使用各类filter pushdown
Dictionary preservation在解码期间保留字典,可显著提高读取 Arrow 数组时的性能
Vectorized decode一次将多个值解码为列式内存格式
Projection pushdown只读选择的列
Predicate pushdown条件下推
RowGroup pruning基于条件过滤不需要读取的RowGroup
Page pruning1. velox虽然PageReader中存在skip的函数,但是并不是为了filter push down服务的,filter push down是行级别判断的; 2. velox不支持倒排索引的指定行读取,这个过程也会执行 Page pruning,datafusion就做的很好[8]
Late materialization多列执行filter后求交集再解码,在行较多,条件较多时可以减少大量的解码开销
Pre-Buffer预缓冲原始 Parquet 数据,而不是每个column chunk进行一次读取
Parquet V2列编码和页压缩算法的完全兼容实现
自定义Row Filter[8][9]此功能允许对Parquet实现自定义的索引,以大幅加速查询性能;但是需要PageIndex,这样就可以基于传入的row在跳过RowGroup的基础上跳过不需要的Page

总体来看的评价是:

  1. 基本功能完善
  2. 性能可想的不够优异
  3. 架构追求美感,包袱重,很难复用Arrow-cpp
  4. 不支持倒排索引

其次从部分测试结果看,Velox filter pushdown存在一些性能问题,还没有filter Operator快,这个具体的原因后续仔细分析以下,暂时倒不是特别急切。

场景Velox 500条件Velox 15条件
Q1filter算子3.566s0.123s
Q1filter pushdown3.220s0.117s
Q2filter算子0.951s0.163s
Q2filter pushdown3.272s0.124s
Q3filter算子1.455s0.116s
Q3filter pushdown3.851s0.142s

功能展望

我们对于Velox Parquet Reader的核心诉求是支持Parquet级别的倒排索引,这样可以无缝适配到TableScan算子的异步Push模式中去。执行引擎本身当然也需要改一些地方,但都可以逐步迭代,并不急切。

我改了一版的Velox代码,以支持Parquet ReaderRow级别的过滤,只需要在AddSplit时添加文件对应的RowFilter信息即可,接口部分模仿DataFusion+Arrow-rs,但是还有部分测试和代码优化工作需要细化;

肉眼可见的,Velox社区还有大量的核心特性的贡献机会,虽然Meta的开源社区维护一直被人诟病,但是有PrestoSpark背书近五年到不必担心项目爆雷。

参考:

  1. 从一到无穷大 #26 Velox:Meta用cpp实现的大一统模块化执行引擎
  2. https://blog.mwish.me/
  3. Velox: Meta’s Unified Execution Engine vldb2022
  4. Querying Parquet with Millisecond Latency
  5. [Design] Native Parquet Reader
  6. PARQUET-1820: [C++] pre-buffer specified columns of row group #6744
  7. GrepTimeDB index
  8. advanced_parquet_index.rs
  9. parquet/src/arrow/arrow_reader/selection.rs
  10. Faster C++ Apache Parquet performance on dictionary-encoded string data coming in Apache Arrow 0.15
  11. Parquet SBBF
  12. Split block Bloom filters

http://www.kler.cn/news/314855.html

相关文章:

  • 计算机基础知识笔记
  • 基于协同过滤+python+django+vue的音乐推荐系统
  • 鸿蒙Harmony-Next 徒手撸一个日历控件
  • Qt中样式表常用的属性名称定义
  • 利用Python与Ansible实现高效网络配置管理
  • 【Harmony】轮播图特效,持续更新中。。。。
  • Ubuntu24.04 安装ssh开启22端口及允许root用户远程登录
  • 【Flink实战】flink消费http数据并将数组展开多行
  • linux-虚拟化与容器化-虚拟化
  • 无法删除选定的端口,不支持请求【笔记】
  • Java流程控制语句——跳转语句详解:break 与 continue 有什么区别?
  • Go 并发模式:管道的妙用
  • biopython解析mmcif文件得到组装体、链、序列、原子坐标、变换矩阵等信息
  • 统信服务器操作系统【1050e版】安装手册
  • 十个服务器中毒的常见特征及其检测方法
  • elasticsearch学习与实战应用
  • 音视频生态下Unity3D和虚幻引擎(Unreal Engine)的区别
  • T4—猴痘识别
  • Qwen2-VL的微调及量化
  • React【1】【ref常用法】
  • 小程序地图展示poi帖子点击可跳转
  • 20240921在友善之臂的NanoPC-T6开发板上使用Rockchip原厂的Android12适配宸芯的数传模块CX6602N
  • 【监控】【Nginx】使用 ELK Stack 监控 Nginx
  • Docker Compose 启动 PostgreSQL 数据库
  • 《在华为交换机上配置防止 ARP 攻击》
  • 一个基于 Tauri、Vite 5、Vue 3 和 TypeScript 构建的即时通讯系统,牛啊牛啊!(附源码)
  • 无人机助力智慧农田除草新模式,基于YOLOv10全系列【n/s/m/b/l/x】参数模型开发构建无人机航拍场景下的农田杂草检测识别系统
  • 分布式变电站电力监控系统
  • EmptyDir-数据存储
  • gis专业怎么选电脑?