从一到无穷大 #35 Velox Parquet Reader 能力边界
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。
文章目录
- 引言
- 源码分析
- 功能描述
- 功能展望
引言
InfluxDB IOX
这样完全不使用索引,只是基于执行引擎与Arrow-rs Parquet Reader
的极致工程化道路无疑是相对极端的,这样的做法在大多数低时间线场景性能基本不可能高于VictoriaMetrics
和Influxdb v1.x
这样的全倒排索引实现,尤其是在筛选条件较多时。
权衡之下,像GreptimeDB
这样对Parquet构建稀疏倒排索引[7]的方案就成了非多引擎下的合理选择,依托于Datafusion
和Arrow-rs
活跃的社区和完备的功能,事实上可以相对低成本,低风险的在各方面指标达到良好的表现,在初创公司的角度来看这当然是一条合理,有效,且高效的道路。
但是世界并不是只由rust构成,也不是所有团队都有资源愿意像InfluxData
一样对DataFusion
,Arrow-rs
这样基础库做大量的投入,并稳操社区的控制权,最后才反哺自己的产品,当然回馈了不少上层软件产品,同时孕育了一众开源的时序数据库产品。
对大多数团队来说如何在现有资源下低风险,高人效的拿成果就成了需要思考的问题。
在21世纪20年代来看,自研数据库计算引擎是一件极高投入,较低回报的事情,算子扩展,并行化,性能提升,稳定性等无不需要大量的精力投入,到最后性能,功能也不及世界顶尖的执行引擎产品,这事实上是基本可预料的,一个数据库产品团队的内核研发能有多少人力去做专用计算引擎呢?这也是
这条路Meta
已经走过了[1],其设计了Velox
用来替换Presto
,Spark
,XStream
等系统的执行引擎,基础语言为CPP。
我们的系统语言为Cpp,在经过技术调研后除去从DuckDB
,Clickhouse
等知名项目中抠执行引擎外,可行的技术选择只剩下了Arrow Acero
和Velox
。Arrow Acero
虽然依托于Arrow-cpp
社区,且愿景宏大,但是整体还处于实验阶段,且没有值得信赖的项目背书。相对之下Velox
确实就成了唯一的选择。
对Velox
研究了一段时间后,认为Velox
满足了90%以上的功能需求,但是部分性能关键点存在缺失,有比较大的修改空间,本篇文章聚集在Velox Parquet Reader
,探究其功能缺失点。
源码分析
Velox
的Velox Parquet Reader
并不是原生的Arrow-cpp
的实现,而是100%自主实现的,官方给出的解释[5]是:
Velox implements a visitor pattern suitable for most columnar file formats. This visitor pattern implements file-format agnostic features like predicate pruning, filter evaluation, etc… There are also some IO optimizations like prefetching and I/O coalescing.
The format-specific implementation (for Parquet, ORC, and DWRF) involves extending these visitor APIs. Wrapping these implementations around apache arrow parquet would involve a lot more work.
Velox implements a visitor pattern suitable for most columnar file formats. This visitor pattern implements file-format agnostic features like predicate pruning, filter evaluation, etc… There are also some IO optimizations like prefetching and I/O coalescing.
Velox
代码中Parquet Reader
(velox/dwio/parquet/reader)的部分只涉及了文件格式的编解码,连Decoder
部分也是公有的,而类似filter pushdown
这样的功能则是通过Decoder
,放在visitor pattern
中去实现。
这样来看,确实实现更多的优化功能会十分麻烦,因为这部分代码需要新增在common部分,且需要新增大量新的公有接口。
先给出一个我自己写的Filter pushdown
的读取Parquet文件样例,example中对这部分描述比较少,而且都是很底层的接口,不研究下代码还是比较难写出来的,我认为对初学者有比较大的学习价值。
void processParquetFile(const std::string& filePath, std::vector<RowVectorPtr>& rowBatches, memory::MemoryPool* pool) {
dwio::common::ReaderOptions readerOpts{pool};
readerOpts.setFileFormat(FileFormat::PARQUET);
auto reader = getReaderFactory(FileFormat::PARQUET)
->createReader(
std::make_unique<BufferedInput>(
std::make_shared<LocalReadFile>(filePath),
readerOpts.memoryPool()),
readerOpts);
if (!reader) {
std::cerr << "Failed to create reader for file: " << filePath << std::endl;
return;
}
RowReaderOptions rowReaderOptions;
auto rowType = ROW({"measurement", "timestamp", "arch", "datacenter", "usage_user"},
{VARCHAR(), TIMESTAMP(), VARCHAR(), VARCHAR(), DOUBLE()});
rowReaderOptions.select(
std::make_shared<facebook::velox::dwio::common::ColumnSelector>(
rowType, rowType->names(), nullptr, false));
auto scanSpec = std::make_shared<facebook::velox::common::ScanSpec>("");
auto untyped = parse::parseExpr("usage_steal >= 3.0", parse::ParseOptions());
auto filterExpr = core::Expressions::inferTypes(untyped, rowType, pool);
std::shared_ptr<core::QueryCtx> queryCtx{core::QueryCtx::create()};
exec::SimpleExpressionEvaluator evaluator{queryCtx.get(), pool};
auto [subfield, filter] = exec::toSubfieldFilter(filterExpr, &evaluator);
auto fieldSpec = scanSpec->getOrCreateChild(subfield);
fieldSpec->addFilter(*filter);
scanSpec->addAllChildFields(*rowType);
rowReaderOptions.setScanSpec(scanSpec);
auto rowReader = reader->createRowReader(rowReaderOptions);
std::cout << "The type of rowReader is: " << typeid(*rowReader).name() << std::endl;
auto rowBatch = BaseVector::create(rowType, 50000, pool);
while (rowReader->next(50000, rowBatch)) {
auto rowVector = std::dynamic_pointer_cast<RowVector>(rowBatch);
if (rowVector) {
std::lock_guard<std::mutex> lock(batchMutex);
rowBatches.push_back(rowVector);
} else {
std::cerr << "Error: Batch is not a RowVector for file: " << filePath << std::endl;
}
}
}
以一个Filter pushdown的流程来入门,整体代码流程如下:
velox/dwio/parquet/reader/ParquetColumnReader.cpp:build
velox/dwio/parquet/reader/StructColumnReader.h:StructColumnReader 在构造函数中构造每一列的column reader
velox/dwio/common/SelectiveStructColumnReader.cpp:next
velox/dwio/common/SelectiveStructColumnReader.cpp:read 从此处处获取每一列的数据
velox/dwio/common/SelectiveStructColumnReader.cpp:getValues 分别调用child的getValues
velox/dwio/parquet/reader/FloatingPointColumnReader.h:read
velox/dwio/common/SelectiveFloatingPointColumnReader.h:readCommon
velox/dwio/common/SelectiveFloatingPointColumnReader.h:processFilter
velox/dwio/common/SelectiveFloatingPointColumnReader.h:readHelper
回调 velox/dwio/parquet/reader/FloatingPointColumnReader.h:readWithVisitor filter作为参数构造ColumnVisitor
velox/dwio/parquet/reader/ParquetData.h:readWithVisitor(Visitor visitor) visitor中包含filter
velox/dwio/parquet/reader/PageReader.h:readWithVisitor
velox/dwio/parquet/reader/PageReader.h:callDecoder
velox/dwio/parquet/reader/PageReader.cpp:seekToPage
velox/dwio/parquet/reader/PageReader.cpp:prepareDataPageV2
velox/dwio/parquet/reader/PageReader.cpp:makeDecoder 构造decode
velox/dwio/dwrf/common/ByteRLE.h:readWithVisitor 各种code被赋值给decode
velox/dwio/common/ColumnVisitors.h:process 最重要的逻辑,判断是否应该过滤列
velox/type/Filter.h:applyFilter
velox/dwio/common/ColumnVisitors.h:filterPassed
velox/dwio/common/ColumnVisitors.h:addResult 然后把下标加入addOutputRow
velox/dwio/common/ColumnVisitors.h:addOutputRow
回调 velox/dwio/common/SelectiveColumnReader.h:addOutputRow 把row加入outputRows_
功能描述
Velox Parquet Reader
基本能力可以表述如下:
功能 | Rust-rs | Velox | 说明 |
---|---|---|---|
RowGroup Parallel Reading | ✅ | ❌ | velox 支持ParquetRowReader查询指定offset limit的数据;基于此机制可以实现RowGroup并行读取,但是需要在TableScan中额外封装,较为复杂,没有底层机制自动实现RowGroup并行读取。 |
Page Index | ✅ | ❌ | offset index + column index |
Split Block Bloom filters (SBBF) | ✅ | ❌ | [11][12] 利用现代 SIMD 指令将速度提高 30%-450% 的布隆过滤器变体 |
Streaming decode | ✅ | ✅ | 允许一次解码一批行后push到上层Operator执行,做到解码和执行形成一个pipeline,这也是Velox的Push执行模型 |
I/O pushdown | ✅ | ❌ | 避免缓冲整个文件,首先获取和解码元数据,然后对相关数据块进行范围提取,并与 Parquet 数据的解码交错,并积极使用各类filter pushdown |
Dictionary preservation | ✅ | ❌ | 在解码期间保留字典,可显著提高读取 Arrow 数组时的性能 |
Vectorized decode | ✅ | ❌ | 一次将多个值解码为列式内存格式 |
Projection pushdown | ✅ | ✅ | 只读选择的列 |
Predicate pushdown | ✅ | ✅ | 条件下推 |
RowGroup pruning | ✅ | ✅ | 基于条件过滤不需要读取的RowGroup |
Page pruning | ✅ | ❌ | 1. velox虽然PageReader中存在skip的函数,但是并不是为了filter push down服务的,filter push down是行级别判断的; 2. velox不支持倒排索引的指定行读取,这个过程也会执行 Page pruning,datafusion就做的很好[8] |
Late materialization | ✅ | ❌ | 多列执行filter后求交集再解码,在行较多,条件较多时可以减少大量的解码开销 |
Pre-Buffer | ✅ | ❌ | 预缓冲原始 Parquet 数据,而不是每个column chunk进行一次读取 |
Parquet V2 | ✅ | ✅ | 列编码和页压缩算法的完全兼容实现 |
自定义Row Filter | ✅ | ❌ | [8][9]此功能允许对Parquet实现自定义的索引,以大幅加速查询性能;但是需要PageIndex,这样就可以基于传入的row在跳过RowGroup的基础上跳过不需要的Page |
总体来看的评价是:
- 基本功能完善
- 性能可想的不够优异
- 架构追求美感,包袱重,很难复用Arrow-cpp
- 不支持倒排索引
其次从部分测试结果看,Velox filter pushdown
存在一些性能问题,还没有filter Operator
快,这个具体的原因后续仔细分析以下,暂时倒不是特别急切。
场景 | Velox 500条件 | Velox 15条件 |
---|---|---|
Q1filter算子 | 3.566s | 0.123s |
Q1filter pushdown | 3.220s | 0.117s |
Q2filter算子 | 0.951s | 0.163s |
Q2filter pushdown | 3.272s | 0.124s |
Q3filter算子 | 1.455s | 0.116s |
Q3filter pushdown | 3.851s | 0.142s |
功能展望
我们对于Velox Parquet Reader
的核心诉求是支持Parquet
级别的倒排索引,这样可以无缝适配到TableScan
算子的异步Push
模式中去。执行引擎本身当然也需要改一些地方,但都可以逐步迭代,并不急切。
我改了一版的Velox
代码,以支持Parquet Reader
中Row
级别的过滤,只需要在AddSplit
时添加文件对应的RowFilter
信息即可,接口部分模仿DataFusion+Arrow-rs
,但是还有部分测试和代码优化工作需要细化;
肉眼可见的,Velox
社区还有大量的核心特性的贡献机会,虽然Meta
的开源社区维护一直被人诟病,但是有Presto
,Spark
背书近五年到不必担心项目爆雷。
参考:
- 从一到无穷大 #26 Velox:Meta用cpp实现的大一统模块化执行引擎
- https://blog.mwish.me/
- Velox: Meta’s Unified Execution Engine vldb2022
- Querying Parquet with Millisecond Latency
- [Design] Native Parquet Reader
- PARQUET-1820: [C++] pre-buffer specified columns of row group #6744
- GrepTimeDB index
- advanced_parquet_index.rs
- parquet/src/arrow/arrow_reader/selection.rs
- Faster C++ Apache Parquet performance on dictionary-encoded string data coming in Apache Arrow 0.15
- Parquet SBBF
- Split block Bloom filters