Flink (三):核心概念(并行度、算子链、任务槽)
1. 作业提交
Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程./bin/flink run ...
中运行。
可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为standalone 集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。
1.1 JobManager
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
-
ResourceManager: ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
-
Dispatcher: Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
-
JobMaster:JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby。
1.2 TaskManager
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子
2. 并行度
一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。如图中Source 和Map 设置为了一个Task, 这个Task 的并行度为2 ,故存在两个sub_task。
2.1 设置并行度
一个 task 的并行度可以从多个层次指定:
2.1.1 算子层次
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
每个算子后,都可以进行并行度设置,这时的并行度设置只会应用到设置的对应算子
2.1.2 执行环境层次
Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
可以通过调用 setParallelism()
方法指定执行环境的默认并行度。如果想以并行度3
来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();
env.execute("Word Count Example");
这时设置的并行度对运行程序中,所有算子生效
2.1.3 客户端层次
将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。在 CLI 客户端中,可以通过 -p
参数指定并行度,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在 Java/Scala 程序中,可以通过如下方式指定并行度:
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
2.1.4 系统层次
可以通过设置 Flink 配置文件中的 parallelism.default
参数,在系统层次来指定所有执行环境的默认并行度。可以通过参考官方配置文档获取更多细节。
这几个层次的优先级 算子层次 >> 执行环境层次 >> 客户端层次 >> 系统层次
2.2 设置最大并行度
最大并行度在每个作业和每个算子粒度上进行设置,决定了有状态算子能够扩展的最大并行度。目前,作业启动后无法改变算子的最大并行度,除非丢弃该算子的状态。设置最大并行度的原因,区别于允许有状态算子无限扩展,是因为它会对应用程序的性能和状态大小产生影响。Flink需要维护特定的元数据来支持重新调整状态的能力,而这些元数据随着最大并行度的增加而线性增长。通常,您应该选择一个足够高的最大并行度,以满足未来的可扩展性需求,同时保持它足够低,以维持合理的性能。
最大并行度和并行度的主要区别在于:并行度决定了算子处理数据的sub_task 数目,但是对于有状态的算子,除了需要对应的处理数据,还需要额外的处理状态,那么这里最大并行度就是限制有状态算子的最大的sub_task的数目。
最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism()
方法修改并行度相似,你可以通过调用 setMaxParallelism()
方法来设定最大并行度。
默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2)
值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2
的幂次方,注意默认最大并行度下限为 128
,上限为 32768
。
注意:为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。从之前的作业恢复时,改变该作业的最大并行度将会导致状态不兼容。
3. 算子链
将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:
但是构建算子链是有一定条件的:
-
上下游算子实例处于同一个 SlotSharingGroup 中
-
两个算子间的物理分区逻辑是 ForwardPartitioner
-
两个算子间的 shuffle 方式不是批处理模式
-
上下游算子实例的并行度相同
3.1 创建新链
基于当前算子创建一个新的算子链。后面两个 map 将被链接起来,而 filter 和第一个 map 不会链接在一起。
someStream.filter(...).map(...).startNewChain().map(...);
3.2 禁止链接
禁止和 map 算子链接在一起。
someStream.map(...).disableChaining();
3.3 配置 Slot 共享组
为某个算子设置 slot 共享组。Flink 会将同一个 slot 共享组的算子放在同一个 slot 中,而将不在同一 slot 共享组的算子保留在其它 slot 中。这可用于隔离 slot 。如果所有输入算子都属于同一个 slot 共享组,那么 slot 共享组从将继承输入算子所在的 slot。slot 共享组的默认名称是 “default”,可以调用 slotSharingGroup(“default”) 来显式地将算子放入该组。
someStream.filter(...).slotSharingGroup("name");
4. 任务槽
每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。
每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。
通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。
默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:
-
Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
-
容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。
5. 并行度和任务槽关系
- Flink 在执行作业时,根据作业的并行度来决定需要多少个 slot。如果一个算子的并行度为 4,那么它将需要 4 个 slot 才能同时执行这 4 个并行任务。
- 作业的并行度和 TaskManager 的可用 slot 数量直接决定了作业的执行情况。如果集群中没有足够的 slot,作业可能会等待调度。如果是standalone 会话模式启动flink集群,则并行度 > slot 数目则会报错,
如:我们standalone 会话模式在本地启动flink 集群
此时可以发现,slot 数目为1,当我们在web界面提交作业设置并行度是2 时,就会出现如下报错,提示资源不足。
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:226)
... 40 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
... 38 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
如果是使用YARN,则不会出现这种情况,YARN会根据任务所需要的情况,启动对应数目的TaskManager 来保证作业的稳定运行