当前位置: 首页 > article >正文

Flink RocksDB状态缩放加速:RocksDB原生DeleteRange原理简析

又见Rescale

笔者在很久之前的一篇文章(传送门)中讲解过Flink的状态缩放(Rescale)和键组(Key Group)设计,相信各位看官对下面这张图已经很熟悉了。

简言之,Flink通过引入Key Group,将状态Rescale时从远端DFS恢复数据的操作从随机读尽量优化成顺序读,I/O瓶颈大大减轻。

但是,当Flink应用的Sub-task和状态Key非常多时,改变有状态算子的并行度仍然可能要花费较长时间恢复。例如我们负责的比较大的一个Flink任务(版本1.14),状态Key总数接近10亿,并行度从240扩容到480,在TaskManager资源充足且HDFS吞吐无瓶颈的情况下,状态数据完全恢复也需要20+分钟时间。除了数据量大之外,还有一个关键原因是:并行度增加后,新的Sub-task拿到原来的状态数据,需要将不属于自己的Key Group裁剪掉,否则会与其他Sub-task冲突。以上图为例,扩容后的Sub-task 1需要将扩容前Sub-task 0和1的状态文件都恢复到RocksDB,并且裁剪掉KG-1KG-2KG-5KG-6的数据,只保留KG-3KG-4。而我们知道,RocksDB的删除操作是产生Tombstone记录,本质上与写入无异,所以这种情况下TaskManager本地磁盘仍然有较大的I/O压力。

不过,上述问题在Flink 1.16有了一定改善,因为RocksDB状态后端运用了RocksDB原生的DeleteRange API来快速删除指定区间内的Keys,在我们的实测中,大状态任务恢复速度最多可以提升60%。下面讨论DeleteRange的实现原理。

RocksDB原生DeleteRange原理

在没有DeleteRange API的时候,区间删除只能采用传统的迭代器遍历操作:

Slice start, end;
auto it = db->NewIterator(ReadOptions());
for (it->Seek(start); cmp->Compare(it->key(), end) < 0; it->Next()) {
  db->Delete(WriteOptions(), it->key());
}

有了DeleteRange API,就简单很多:

Slice start, end;
db->DeleteRange(WriteOptions(), start, end);

为了实现区间删除,RocksDB在原始的MemTable(称为Point-key MemTable)之外,又新增了Range Tombstone MemTable,专门缓存区间删除的数据。同理,在SST文件中也对应新增了包含区间删除信息的元数据块Range Tombstone Block(Seqnum为写入序列号)。如下两图所示。

由此可见,如果我们要删除包含10000个连续Key的集合,传统方式会产生10000个Tombstone,而DeleteRange方式只会产生1个Range Tombstone,能够有效降低读写放大。

在写入过程中,Range Tombstone也需要参与Compaction流程,以及时删除无效Tombstone。此处细节很多,简单概括来讲:

  • 在Compaction开始时,收集所有源SST文件的Range Tombstone区间,形成一个包含所有区间删除Key的最小堆。
  • 对于每个输入Key,判断它是Merge类型还是Put类型:Merge操作则将该Key的所有历史版本合并,如果历史版本没有被Snapshot引用,则可以删除对应的Tombstone;Put操作说明该Key是新写入数据,所有Tombstone都可以被清理掉。
  • 清理完成后,将剩余的有效Tombstone重新写回新SST文件的Range Tombstone Block。

引入Range Tombstone后,RocksDB读取操作面临一个新问题:如何快速判断要读取的Key是否位于某个已经标记删除的区间中?答案是分段(RocksDB内部称为"Fragmentation"),本质上与天际线问题(The Skyline Problem)的解法相同,见Leetcode 218。

如上图所示,在RocksDB的语义下,X轴表示Key,Y轴表示Seqnum。在打开一个SST文件时,RocksDB会扫描该文件中所有的Range Tombstone区间(图A中不同颜色的色块),并将它们整合成互不重叠的子区间。将这些子区间按照左值升序排序并缓存下来(图B),就可以根据Key进行高效的二分查找了。

Flink对DeleteRange的运用

回到Flink,以1.16版本为例,当任务发生Rescale并从状态恢复到RocksDB时,实际上是调用RocksDBIncrementalRestoreOperation#restoreWithRescaling()方法:

private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles)
            throws Exception {

        // Prepare for restore with rescaling
        KeyedStateHandle initialHandle =
                RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
                        restoreStateHandles, keyGroupRange, overlapFractionThreshold);

        // Init base DB instance
        if (initialHandle != null) {
            restoreStateHandles.remove(initialHandle);
            initDBWithRescaling(initialHandle);
        } else {
            this.rocksHandle.openDB();
        }
    // ..................................
    }

其中,RocksDBIncrementalCheckpointUtils#chooseTheBestStateHandleForInitial()方法负责从所有要恢复的状态句柄中,尽量选择出与当前Sub-task负责的KeyGroupRange重合比例最高的一个,用来初始化本地RocksDB实例,以尽量降低后续裁剪的压力。

接下来通过initDBWithRescaling()方法调用RocksDBIncrementalCheckpointUtils#clipDBWithKeyGroupRange()方法,按照KeyGroupRange的范围进行裁剪。从文章开头的图示可知,由于Key Group已经是有序的,因此在扩容的情况下,新Sub-task不再负责的Key Group一定位于头尾,因此只需要比较两者的startKeyGroupendKeyGroup即可。

public static void clipDBWithKeyGroupRange(
            @Nonnull RocksDB db,
            @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
            @Nonnull KeyGroupRange targetKeyGroupRange,
            @Nonnull KeyGroupRange currentKeyGroupRange,
            @Nonnegative int keyGroupPrefixBytes)
            throws RocksDBException {

        final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];

        if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup(
                    currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup(
                    targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }

        if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup(
                    targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup(
                    currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }
    }

deleteRange()方法就是代理了RocksDB JNI的同名方法,进行高效的区间删除。

private static void deleteRange(
            RocksDB db,
            List<ColumnFamilyHandle> columnFamilyHandles,
            byte[] beginKeyBytes,
            byte[] endKeyBytes)
            throws RocksDBException {

        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
            // Using RocksDB's deleteRange will take advantage of delete
            // tombstones, which mark the range as deleted.
            //
            // https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
            db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
        }
    }

读者也可以手动翻一下Flink 1.14或更早版本的源码, deleteRange()方法是通过遍历Key进行前缀比较,并执行WriteBatch操作批量删除不符合条件的Key,相比原生DeleteRange的效率要低很多。

The End


http://www.kler.cn/a/453989.html

相关文章:

  • http 请求总结get
  • Nginx的性能分析与调优简介
  • ArcGIS土地利用数据制备、分析及基于FLUS模型土地利用预测(数据采集、处理、分析、制图)
  • golang标准库SSH操作示例
  • 编码转换(实例)
  • 重温设计模式--观察者模式
  • 云原生相关的 Go 语言工程师技术路线(含博客网址导航)
  • JAVAweb学习日记(三)Ajax
  • Android view 基本的绘制流程
  • 记录Linux Centos 7 安装PostgreSQL 16
  • JZ31 栈的压入、弹出序列
  • Windows脚本命令与Linux Bash脚本命令
  • xctf-WEB-新手练习区Exercise area-Writeup
  • 2024年12月一区SCI-加权平均优化算法Weighted average algorithm-附Matlab免费代码
  • BP回归-反向传播(Backpropagation)
  • 【git】配置ssh代理
  • 人工智能与大数据:商贸物流变革的双引擎与挑战应对
  • 软件设计与体系结构
  • 消费企业如何提升主动造血能力?会员精细化运营是关键!
  • 面试知识点汇总_05
  • linux提示结构需要清理
  • nodejs操作达梦数据库的封装
  • 基于YOLOV5+Flask安全帽RTSP视频流实时目标检测
  • 移植 OLLVM 到 Android NDK,Android Studio 中使用 OLLVM
  • 【开源免费】基于SpringBoot+Vue.JS植物健康系统(JAVA毕业设计)
  • 1847. 最近的房间