Flink-DataStreamAPI-执行模式
一、概览
DataStream API支持不同的运行时执行模式,我们可以根据用例的要求和作业的特征进行选择。
STREAMING
执行模式:被称为“经典”执行模式为,主要用于需要持续增量处理并且预计无限期保持在线的无界作业BATCH
执行模式:类似于MapReduce的批处理框架,主要用于已知固定输入且不连续运行的有界作业。- AUTOMATIC执行模式:交给Flink自己决断,如果所有源都有界,Flink将选择BATCH,否则选择STREAMING
Flink对流和批次处理作业的统一方法意味着,无论配置的执行模式如何,在有界输入上执行的DataStream应用程序都将产生相同的最终结果。请务必注意最终的含义:在STREAMING
模式下执行的作业可能会产生增量更新(想想数据库中的upserts),而BATCH
作业最终只会产生一个最终结果。如果解释正确,最终结果将是相同的,但到达那里的方式可能不同
当启用BATCH
执行,我们允许Flink应用额外的优化,只有当我们知道我们的输入是有界的时候,我们才能这样做。例如,除了允许更有效的任务调度和故障恢复行为的不同洗牌实现之外,还可以使用不同的连接/聚合策略。
二、我什么时候可以/应该使用BATCH执行模式?
只有有界的作业/Flink程序才能使用BATCH执行模式。有界是数据源的一个属性,它告诉我们来自该源的所有输入在执行之前是否已知,或者新数据是否会无限期地出现。反过来,如果一个作业的所有源都有界,则该作业是有界的,否则是无界的。
STREAMING执行模式可以用于有界和无界作业
根据经验,当程序有界时,应该使用BATCH执行模式,因为这会更有效。当程序无界时,则必须使用STREAMING执行模式,因为只有这种模式足够通用,能够处理连续的数据流。
三、配置BATCH执行模式
执行模式可以通过execution.runtime-mode
设置进行配置。有三个可能的值:STREAMING、BATCH、AUTOMATIC
这可以通过bin/flink run ...
的命令行参数进行配置,或者在创建/配置StreamExecutionEnvironment
时以编程方式进行配置。例如:
命令行:
bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议不要在程序中设置运行时模式,而是在提交应用程序时使用命令行设置。保持应用程序代码无配置允许更大的灵活性,因为同一应用程序可以在任何执行模式下执行
四、执行行为
1、任务调度和网络洗牌
Flink作业由在数据流图中连接在一起的不同操作组成。系统决定如何安排这些操作在不同进程/机器(TaskManager)上的执行,以及如何在它们之间洗牌(发送)数据。
可以使用称为链接的功能将多个操作/运算符链接在一起。Flink认为作为调度单元的一组一个或多个(链接的)运算符称为任务。术语子任务通常用于指代在多个TaskManager上并行运行的任务的各个实例,但我们在这里只使用术语任务。
任务调度和网络洗牌在BATCH和STREAMING执行模式下的工作方式不同。主要是因为我们知道我们的输入数据在BATCH执行模式下是有界的,这允许Flink使用更有效的数据结构和算法。
我们将用这个例子来解释任务调度和网络传输的区别:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
和Spark一样,数据之间关系是1对1、多对1关系的,Flink通常不会在它们之间插入网络洗牌。例如:map()
, flatMap()
, filter()。
诸如keyBy()或re平衡()之类的操作需要在任务的不同并行实例之间洗牌数据。这会导致网络洗牌。
对于上面的示例,Flink将操作组合为如下任务:
- Task1:
source
,map1
,map2
- Task2:
map3
,map4
- Task3:
map5
,map6
,sink
Task1和Task2以及Task2和Task3之间有一个网络洗牌:
STREAMING执行模式
在流式执行模式下,所有任务都需要一直在线/运行。这允许Flink立即通过整个管道处理新记录,这是我们连续和低延迟流处理所需要的。这也意味着分配给作业的TaskManager需要有足够的资源来同时运行所有任务。
网络洗牌是流水线式的,这意味着记录立即发送到下游任务,并在网络层进行一些缓冲。同样,这是必需的,因为在处理连续的数据流时,没有自然的时间点(在时间上)可以在任务(或任务管道)之间实现数据。这与BATCH执行模式形成鲜明对比,后者可以实现中间结果。
BATCH执行模式
在BATCH执行模式下,作业的任务可以分成可以一个接一个执行的阶段。我们可以这样做,因为输入是有界的,因此Flink可以在进入下一个阶段之前完全处理管道的一个阶段。在上面的示例中,作业将有三个阶段,对应于由洗牌屏障分隔的三个任务。
分阶段处理需要Flink将任务的中间结果物化到一些非短暂的存储中,这允许下游任务在上游任务已经脱机后读取它们,而不是像上面解释的那样立即向下游任务发送记录。这将增加处理的延迟,但也带来了其他有趣的属性。一方面,这允许Flink在发生故障时回溯到最新的可用结果,而不是重新启动整个作业。另一个副作用是BATCH作业可以在更少的资源上执行(就TaskManager的可用槽而言),因为系统可以一个接一个地顺序执行任务。
TaskManager将保留中间结果,至少只要下游任务没有消耗它们。(从技术上讲,它们将被保留,直到消耗管道区域产生它们的输出。)之后,只要空间允许,它们将被保留,以便在失败的情况下允许上述回溯到早期结果。
StateBackend
在STREAMING模式下,Flink使用StateBackend来控制状态的存储方式以及检查点的工作方式。
在BATCH模式下,配置的StateBackend被忽略。相反,键控操作的输入按键分组(使用排序),然后我们依次处理一个键的所有记录。这允许同时只保留一个键的状态。当移动到下一个键时,给定键的状态将被丢弃。
处理顺序
BATCH和STREAMING执行之间在运算符或用户定义函数(UDF)中处理记录的顺序可能不同。
在STREAMING模式下,用户定义的函数不应对传入记录的顺序做出任何假设。数据一到达就会被处理。
在BATCH执行模式下,有一些操作是Flink保证顺序的。排序可以是特定任务调度、网络洗牌和StateBackend(见上文)的副作用,也可以是系统有意识的选择。
我们可以区分三种一般类型的输入:
- 广播输入:来自广播流的输入
- 常规输入:既不是广播也不是键控的输入
- 键控输入:来自KeyedStream的输入
使用多种输入类型的函数或运算符将按以下顺序处理它们:
- 首先处理广播输入
- 接着处理常规输入
- 最后处理键控输入
对于从多个常规或广播输入中使用的函数(例如CoProcessFunction),Flink有权以任何顺序处理来自该类型的任何输入的数据。
对于从多个键控输入中使用的函数(例如KeyedCoProcessFunction),Flink在继续下一个键控输入之前,会处理来自所有键控输入的单个键的所有记录。
事件时间/水印
在支持事件时间方面,Flink的流运行时建立在事件可能乱序的悲观假设之上,即时间戳为t的事件可能发生在时间戳为t+1的事件之后。正因为如此,系统永远无法确定给定时间戳T的时间戳为t<T的元素将来不会再出现。为了在使系统实用的同时摊销这种乱序对最终结果的影响,在STREAMING模式下,Flink使用了一种称为水印的启发式方法。带有时间戳T的水印表示没有时间戳为t<T的元素会跟随。
在BATCH模式下,输入数据集是预先知道的,不需要这样的启发式方法,因为至少可以按时间戳对元素进行排序,以便按时间顺序处理。因此在BATCH中,我们可以假设“完美水印”。
鉴于上述情况,在BATCH模式下,我们只需要在与每个键关联的输入末尾MAX_WATERMARK,或者如果输入流没有键控,则在输入末尾。基于此方案,所有注册的计时器将在时间结束时触发,用户定义的WatermarkAssigners或WatermarkGenerator将被忽略。不过,指定WatermarkStrategy仍然很重要,因为它的TimestampAssigner仍将用于为记录分配时间戳。
处理时间
处理时间是机器上处理记录的挂钟时间,在该记录正在被处理的特定实例中。根据这个定义,我们看到基于流转时长的计算结果是不可重现的。这是因为处理两次的同一记录将有两个不同的时间戳。
尽管如此,在流转时长模式下使用流转时长还是很有用的。原因与流转管道经常实时摄取其无界输入有关,因此事件时间和流转时长之间存在相关性。此外,由于上述原因,在流转模式下,事件时间中的1h通常可以在流转时长或挂钟时间中接近1h。因此,使用流转时长可以用于早期(不完整)触发,从而给出预期结果的提示。
在输入数据集是静态的并且事先已知的批处理世界中不存在这种相关性。因此,在BATCH模式下,我们允许用户请求当前流转时长并注册流转时长计时器,但是,就像事件时间一样,所有计时器都将在输入结束时触发
从概念上讲,我们可以想象流转时长在作业执行期间不会提前,我们快进到处理整个输入的时间结束。
故障恢复
在STREAMING执行模式下,Flink使用检查点进行故障恢复。也可以通过状态快照进行容错的更介绍性部分。
故障恢复检查点的特点之一是Flink将在发生故障时从检查点重新启动所有正在运行的任务。这可能比我们在BATCH模式下必须做的事情更昂贵(如下所述),这也是如果您的作业允许,您应该使用BATCH执行模式的原因之一。
在BATCH执行模式下,Flink将尝试并回溯到中间结果仍然可用的先前处理阶段。潜在地,只有失败的任务(或它们在图中的前身)必须重新启动,与从检查点重新启动所有任务相比,这可以提高处理效率和作业的整体流转时长。
2、重要参考因素
与经典的STREAMING执行模式相比,在BATCH模式下,某些功能可能无法按预期工作。某些功能的工作方式略有不同,而其他功能不受支持。
BATCH模式下的行为改变:
Rolling”操作(例如reduce()或sum())会为以STREAMING模式到达的每条新记录发出增量更新。在BATCH模式下,这些操作不是“滚动”。它们只发出最终结果。
BATCH模式下不支持:
检查点和任何依赖于检查点的操作都不起作用
自定义运算符应该小心实现,否则它们可能会行为不当。
检查点
如上所述,批处理程序的故障恢复不使用检查点。想想Spark是如何做的呢?利用RDD的血统,按stage来进行失败重试的,因为每个stage最后都会落盘。
重要的是要记住,因为没有检查点,某些功能,如Checkpoint Listener,因此Kafka的EXACTLY_ONCE模式或File Sink的OnCheckpointRollingPolicy将不起作用。
您仍然可以使用所有状态原语,只是用于故障恢复的机制会有所不同。
编写自定义运算符
注意:自定义运算符是Apache Flink的高级使用模式。对于大多数用例,请考虑改用(keyed-)进程函数。
在编写自定义运算符时,记住对BATCH执行模式所做的假设非常重要。否则,适用于流式传输模式的运算符可能会在BATCH模式下产生错误的结果。运算符永远不会限定为特定键,这意味着他们会看到Flink试图利用的BATCH处理的某些属性。
首先,您不应该在运算符中缓存最后看到的水印。在BATCH模式下,我们逐个键处理记录。因此,水印将在每个键之间从MAX_VALUE切换到MIN_VALUE。您不应该假设水印在运算符中总是升序的。出于同样的原因,计时器将首先按键顺序触发,然后按每个键内的时间戳顺序触发。此外,不支持手动更改键的操作。
------------------------------------------------------------------------------------------------------------------------------
大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
2025年人工智能、数字媒体技术与社会计算国际学术会议
https://ais.cn/u/byAVfu
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025)
https://ais.cn/u/77FJ3u
2025人工智能与计算机网络技术国际学术会议(ICAICN 2025)
https://ais.cn/u/jUfAVz
2025年数据挖掘与项目管理国际研讨会
https://ais.cn/u/nIbMvm