04. Flink的状态管理与容错机制
本文主要介绍下Flink中的状态种类,Flink的检查点机制,状态后端,端到端的一致性保证还有Flink的任务重启和恢复策略
1.Flink中的状态分类
flink的状态主要分为,KeyedState,OperatorState,BroadcastState三类,其中KeyedState和BroadcastState是比较常用的,
-
keyedState: 又分为ValueState,ListState,ReducingState,AggregatingState,MapState这些state看名字就知道它们的作用这里就不多说了,需要注意下的是ReducinngState和AggregationState这两个state都是聚合数据的state而且都是通过add方法添加数据,数据的计算逻辑 按照定义的方法计算,不同的是AggregationState输入和输出可以是不同的类型,如图介绍下它的具体使用方法
-
在open()里定义state的描述器并初始化它
-
在processElement()这样的富函数中根据数据流中的每个元素使用和添加新元素到状态中
-
BroadcastState: 它一般用于小规模数据集需要实时更新的应用场景,它的具体使用方法如下图所示,通过DataStream流调用broadcast方法,这就定义好了广播流,然后通过继承BroadcastProcessFunction抽象类,实现对应的processElement()和processBroadcastElement()方法即可
2. Flink CheckPoint
checkpoint机制是flink实现容错和故障恢复的基石,它是通过间歇性的往正常的数据流中插入一种叫barrier的特殊数据流,barrier会随着正常数据流往各个算子中流转,当barrier流转到当前算子的时候,此算子就会把它自己运行时所有的状态保存到本地,就这样一直到barrier流转到最后一个算子并且它也做好了相应的状态保存,此时所有算子对应的TM就会通知JM此次分布式快照已经完成,JM就会做全局的checkpoint操作。就这样如果发生故障需要恢复,只需把所有算子的状态恢复到最近一次成功的checkpoint中记录的状态即可。
2.1 多并行度下的checkpoint机制
上面讲的是单个并行度的情况,在多并行度的情况下checkpoint机制是不一样的,需要注意的是,barrier往下游流转时是按照一个一个算子往下流的。所以就会出现上游流转到下游并行度变大这种情况,此时上游是通过广播流的形式把上游barrier广播到下游的所有分区中完成barrier到下个算子的流转,如果是上游多个分区往下游少数分区中发,此时就需要对下游的barrier进行对齐才行
- barrier对齐: 在多并行度的情况下,各个并行度的性能不一样,到达下游的时间肯定会有差异,barrier对齐机制就要求,先到达的需要阻塞住自己的正常数据流,等最慢的并行度也到达这个算子才可以做算子状态的本地化存储操作。如果慢的并行度比较慢的话,这就会导致数据流整体的流速减慢,导致数据的反压,也会导致checkpoint时间延长等等恶性循环。
- barrier不对齐: 针对上面出现的问题,flink引入了barrier不对齐机制,它有两种实现方式
- exactly Once(精准一次)语义: 假设有3个并行度,往map算子里流,其中2号并行度优先达到map算子,此时map算子立马进行本地快照,同时记录2号并行度在map中的状态,然后记录1号和3号并行度barrier和map之间的所有元素(inputbuffers),还会记录此时经过map计算往下游输出的元素(output buffers),最后把1号和3号barrier停掉,到此为止map算子的一次性快照完成。
- at least Once(至少一次)语义:假设也是3个并行度往map算子里流,也是2号算子先到达map算子,此时barrier不阻塞,也不做本地快照,2号并行度barrier继续往下流转,一直到3个并行度中最慢的那个并行度也到了map之后才开始做本地快照,此时本地快照记录的是各个并行度在map中的状态,但是此刻快的并行度处理的数据肯定比它本身barrier到时刻处理多一些元素,这些元素在状态重启的时就会出现重复消费的情况,这也是符合至少一次消费的语义。
- exactly Once(精准一次)语义: 假设有3个并行度,往map算子里流,其中2号并行度优先达到map算子,此时map算子立马进行本地快照,同时记录2号并行度在map中的状态,然后记录1号和3号并行度barrier和map之间的所有元素(inputbuffers),还会记录此时经过map计算往下游输出的元素(output buffers),最后把1号和3号barrier停掉,到此为止map算子的一次性快照完成。
2.2 checkpoint的设置参数
- checkpoint存储: CK可以主要的存储地方是内存和文件系统中,具体参考状态后端章节
env.getCheckpintConfig().setCheckpointStorage("hdfs://mycluster/flink/checkpoints");
- checkpoint模式设置: CK的模式中常用的有如下几种,exactly-once 语义保证了端到端的一致,在数据要求比较高的场景(不能丢数据),但是它的性能相对较弱,两一种是at-least-once语义更适合时延和吞吐要求非常高但数据一致性要求不高的场景
env.getCheckpointConfig().setCheckpointMode(CheckpointMode.AT_LEAST_ONCE);
- 其他重要参数:
- 指定完成CK的超时时间:env.getCheckpointConfig().setCheckpointTimeout(10601000)默认10分钟
- 指定两个CK之间的时间间隔:env.getCheckpointConfig().setCheckpointPauseBetweenCheckpoint(500);默认是0
- CK的最大并行度:env.getCheckpointConfig().setMaxConcurrentCheckpoints(1),默认并行度是1
- 可容忍CK的失败次数:env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0)没有默认值
- CK不对齐设置:env.getCheckpointConfig().enableUnalignedCheckpoints()
3 状态后端
Checkpoint的的保存过程都是由状态后端协调和管理,具体来说状态后端有两个主要职责:
- TaskManager本地状态管理: 状态后端负责任务在每个TM上的本地状态,并提供了状态的存储和访问接口
- 检查点写入远程持久化存储: 它负责把Checkpoint的数据存储到指定的远程持久化存储介质中
3.1 状态后端分类
Flink提供了HashMapStateBackend,FSStateBackend,RockDBStateBackend三种状态后端
- HashMapStateBackend: 基于Flink的堆内存存储状态数据,特点是高效,但是容易内存溢出和状态数据丢失等问题
- FSStateBackend: 基于文件系统存储状态,可以是本地文件系统也可以是HDFS这样的分布式系统,特点是适合存储任务状态非常大的场景,但是效率相对低下
- RocksDBStateBackend: 基于RockDB这样一个存储系统,也是把数据存储在本地,同时它比单纯的文件系统在性能上要好,从性能上来说是介于FSStateBackend和RockDBStateBackend之间,同时它是采用增量方式对快照进行存储的,基于以上几点推荐在生产环境中使用
4 Flink端到端一致性保证
4.1 数据处理的语义
分布式系统从严格程度会有以下三种语义,分别是,exactly-once,at-least-once,at-most-once
- exactly-once: 它要求每笔数据都是精准一次的被处理
- at-least-once: 要求数据至少被消费一次
- at-most-once: 数据至多被消费一次
4.2 端到端的数据处理一致性
Flink中数据的一致性指数据的输入,Flink系统内部和数据的输出整体的一致性,系统内部和数据输入的一致性实现相对简单,数据输出的一致性当对于复杂
- 输入端的一致性: 只需要保证数据源能按照不变的顺序重放数据即可,典型的就是文件系统是可以的
- Flink系统内部处理的一致性: 通过checkpoint机制就可以保证
- 输出端数据一致性: 由于数据输出到外部系统之后,成功与否本程序是无法保证的,所有就要求输出端必须具备幂等或支持事务。幂等是指重复写入相同数据只影响一次结果,如Redis写入HBase写入,然而幂等写入的场景比较受限,所以使用事务方式保证输出端数据一致性是主要的思路,Flink中通过两阶段提交的方式实现
5 Flink的两阶段提交
两阶段提交主要是针对支持事务的外部系统实现,输出端一致性的机制,顾名思义它主要是分为两个阶段,分别是pre-commit和commit阶段,Flink在具体实现上是通过结合checkpoint机制一起实现的。
总的来说它是把外部的事务当成是系统内部状态,然后通过checkpoint机制实现,具体实现步骤如下:
- pre-commit阶段: 当checkpoint机制往数据流中插入barrier的时候pre-commit阶段开始,直到barrier流经整个job的最后一个算子,并且所有的算子都做好本地的快照,此时预提交阶段结束
- commit阶段: 在JobManager收到所有算子都完成对应的本地快照之后,它会进行全局的checkpoint操作,当全局的checkpoint完成,此时commit阶段开始,结合外部系统,进行事务提交操作
不管是在pre-commit或是commit阶段的任何一个节点没有成功,程序都会回滚到上一次成功的checkpoint的状态
5.1 两阶段提交代码实现
自定义实现两阶段提交逻辑,主要的步骤是继承TwoPhaseCommitSinkFunction这个抽象类,然后分别实现如下五个方法
- beginTransaction: 开启事务,这个方法中一般实现外部系统的一些初始化操作
- invoke: 每笔数据来都会调用的操作,一般实现数据的处理和写入操作
- preCommit: 预提交阶段,当barrier到达后,会调用该方法并调用beginTransaction方法开启一个新事务
- commit: 真正执行事务提交的方法,一般实现外部系统事务提交的方法
- abort: job一般报错或是出现异常调用的方法,一般实现外部系统事务回滚和事务终止操作