Flink 内存管理
一、内存模型
上图是一个 Flink 程序进程总体的内存模型,其包含 Flink 使用内存、JVM 元空间以及 JVM 开销。
- Flink 使用了堆上内存和堆外内存;
- 框架内存使用了堆上内存和堆外内存的直接内存;
- Task 使用堆上内存和堆外内存的直接内存;
- 管理内存、JVM 元空间以及 JVM 内存开销使用了堆外内存;
- 网络内存使用了堆外内存;
概念解释:
JVM Heap:JVM 堆上内存
Framework Heap Memory:Flink 框架本身使用的内存,不计入 Slot 资源中;
Task Heap Memory:Task 执行用户代码时所使用的堆上内存;
Off-Heap Memory:JVM 堆外内存
Direct Memory:JVM 直接内存
Framework Off-Heap Memory:Flink 框架本身所使用的堆外内存,不计入 Slot 资源;
Task Off-Heap Memory:Task 执行用户代码所使用的堆外内存;
Network Memory:网络数据交换所使用的对外内存大小,如网络交换缓冲区;
Managed Memory:Flink 管理的堆外内存,用于管理排序、哈希表、缓存中间结果以及 RocksDB State Backend 的本地内存;
Flink 使用内存 = 框架堆内和堆外内存 + Task 堆内和堆外内存 + 网络内存 + 管理内存
进程内存 = Flink 使用内存 + JVM 本身使用的内存
1.1、JobManager 内存模型
源码在 JobManagerFlinkMemory.java
配置在 flink-conf.yaml 中的 jobmanager.memory.process.size:
1.2 TaskManager 内存模型
源码在 TaskExecutorFlinkMemory.java
配置在 flink-conf.yaml 中的 taskmanager.memory.flink.size:
1.3 内存分配
1.3.1、JobManager 内存分配
调用 YarnClusterDescriptor.java 中的 startAppMaster()方法:
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
... ...
final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration,
JobManagerOptions.TOTAL_PROCESS_MEMORY);
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasKrb5,
processSpec);
... ...
}