【Flink系列五】Checkpoint及Barrier原理
本章内容
- 一致性检查点
- 从检查点恢复状态
- 检查点实现算法-barrier
- 保存点Savepoint
- 状态后端(state backend)
本文先设置一个前提,流处理的数据都是可回放的(可以理解成消费的kafka的数据)
一致性检查点(checkpoints)
图1
- checkpoint是Flink故障恢复的核心,全称是应用状态的一致性检查点
- 有状态流应用的一致性检查点,其实就是所有任务处理完数据的状态,在某个时间点的一份拷贝(一份快照,存储在状态后端),这个时间点,应用是所有任务能恰好处理完一个相同的输入数据的时候
(图1中不考虑时间,假设1、2、3、4、5、6、7为source源,even为偶数6=2+4,odd为奇数求和9=1+3+5,此时5这个数据在所有tasks都处理完成了,每个任务都会提交一份快照给JM,最终这份拓扑结构(source任务状态是5、sum_even状态是6、sum_odd状态是9)称为checkpoint)
从检查点恢复状态
图2
- 在执行流应用期间,Flink会定期保存状态的一致性检查点
- 如果发生故障,Flink会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
(假设处理到7这个数据的时候,sum_even=2+4+6=12,sum_odd在处理7这个数据的时候fail了,应该如果恢复数据呢)
第一步:遇到故障之后,重启受影响的应用,应用重启的之后,所有任务的状态都是空的
图3
第二步:从checkpoint中读取状态,将状态重置,从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同(回到了和图1相同的状态,如果算子设置了并行度,也可以恢复)。恢复后,source任务必须从检查点恢复的结果后开始读取数据(必须从6开始读取数据)
图4
第三步:开始消费并处理检查点到发生故障之间的所有数据。(处理完7后,sum_even=2+4+6=12,sum_odd=1+3+5+7=16, 所有tasks都处理完后,又会提交一个checkpoint)
图5
这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有的算子都会保存检查点并恢复其所有状态,这样依一来所有的输入流就都会被重置到检查点完成时的位置。
检查点的实现算法
基于Chandy-Lamport算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用
思考一个问题:flink如何判断某个数据已经处理完了呢?(比如图1的offset=5的数据)
答案:是否在每个数据后面跟一个标记,当读到这个标记的时候触发task状态的保存
检查点分界线(checkpoint barrier)
- Flink的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
- 分界线之前到来的数据导致的状态更改,都会包含在当前分界线所属的检查点中;二基于分界线之后的数据导致的所有更改,就会包含在之后的检查点中
图6
barrier有很多叫法,如检查点屏障等
分析一下barrier的工作流程,假设现在有这样的一个场景:有两个输入流的应用程序,用并行的两个source任务来读取(可以认为kafka的两个分区,source并行度设置为2),如图7所示。barrier也是和watermark一样,都是通过广播的方式传递给下游算子
图7
(source任务的并行度=2,sum任务的并行度也是2,sink任务的并行度也是2。)
如图7,两个流的数据都是1、2、3、4、5、6;蓝色数字圆圈代表最后一个处理的是蓝流里面的数据,黄色数字圆圈代表最后一个处理的是黄流里面的数据。
图8
图8中两条流的情况下,barrier如何传递呢?(watermark是取上游分区的最小值)下面一起来看一下
图9
barrier是怎么产生的?
答:JobManager会向每个source任务(同时发给并行的source任务)发送一条带有检查点ID的消息(蓝色三角形2),通过这种方式来启动检查点。产生barrier的过程中,不会影响下游task的正常工作(图9相比图8黄2和蓝2都sink完成了)图9中barrier(ID=2)插入在stream1的3后面,stream2的4后面
图10
barrier随着数据流动,广播到下游,source任务处理完barrier(ID=2)后,会向状态后端发送checkpoint,记录此时的状态。图10相比图9蓝3和黄4都被sum任务处理了。
- 数据源将他们的状态写入检查点,并发出一个检查点barrier
- 状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成
sum_even收到上游所有的barrier之后,才能去做checkpoint状态保存,这就叫做Barrier对齐(分分界线对齐)
图11
- 分界线对齐:barrier向下游传递,sum任务会等待所有的输入分区的barrier到达
- 对于barrier已经到达的分区,继续到达的数据会被缓存
- 而barrier尚未到达的分区,数据会被正常处理
图11中的sum_even中的蓝4需要被缓存,因为来自上游任务的黄色barrier(ID=2)还未到达。(stream1有可能在同一个slot,stream2和stream1跨slot,可能barrier到达的时间会不一致)
图12
- 当收到所有分区的barrier时,任务就讲其状态保存到状态后端的检查点中,然后barrier继续向下游广播
图12中,barrier(ID=2)继续向下游广播。此时蓝色4会从缓存中拿出来做接下来的计算
图13
图13中,sum_even处理完4+8=12,以及4+6+8=18,任务开始正常的数据处理
图14
- sink任务向JobManager确认状态保存到checkpoint完毕
- 当所有的任务都确认已经成功将状态保存到检查点时,检查点就真正完成了(3-4-8-8拓扑保存完成)
最终JobManager会向所有的任务确认task的状态是否正确,确认完成后任务完成。
保存点
- Flink还提供了自定义的镜像保存功能,就是保存点(savepoints)
- 原则上,创建保存点使用的算法与检查点的完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
- Flink不会自动创建保存点,因此用户(或者外部调度系统)必须明确的触发创建操作
- 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
状态后端
Flink 提供了三种可用的状态后端用于在不同情况下进行状态的保存
-
MemoryStateBackend
内存级的状态后端,将监控状态作为内存中的对象进行管理,将他们存储在TM的JVM堆上,而将checkpoint存储在JM的内存中
-
FsStateBackend
将checkpoint存储到远程的持久化系统FileSystem中,而对于本地状态,和MemotyStateBackend一样,也会存储在TM的JVM堆上
-
RocksDBStateBackend
将所有的状态序列化后,存入本地的RocksDB中(注意:RocksDb的支持并不直接包含在Flink中,需要引入依赖),RocksDBStateBackend 是唯一支持增量快照的状态后端。
后续补充具体的代码