Flink系统知识讲解之:Flink内存管理详解
Flink系统知识讲解之:Flink内存管理详解
在现阶段,大部分开源的大数据计算引擎都是用Java或者是基于JVM的编程语言实现的,如Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处是不用考虑底层,降低了程序员的门槛,JVM可以对代码进行深度优化,对内存资源进行管理,自动回收内存。但是自动内存管理的问题在于不可控,基于JVM的大数据引擎常常会面临一个问题,即在处理海量数据时,如何在内存中存储大量的数据。
自主内存管理
Flink从一开始就选择了自主的内存管理,避开了JVM内存管理在大数据场景下的问题,提升了计算效率。
1.JVM内存管理的不足
当需要将海量数据存储到内存中时,就不得不面对JVM存在的几个问题:
(1)有效数据密度低
Java的对象在内存中的存储包含3个主要部分:对象头、实例数据、对齐填充部分。32位和64位的虚拟机中对象头分别需要占32bit和64bit。实例数据时实际的数据存储。为了提高效率,内存中数据存储不是连续的,而是按照8 byte的整数倍进行存储。例如,只有一个boolean字段的类实例占16 byte:头信息8 byte,boolean 1 byte,为了对齐达到8的倍数会额外占用7 byte。这就导致在JVM中有效信息的存储密度很低。
(2)垃圾回收
JVM的内存回收机制的优点和缺点同样明显,优点是开发者无需关注资源回收问题,可以提高开发效率,减少内存泄漏的可能。但是内存回收是不可控的,在大数据计算的场景中,这个缺点被放大,TB、PB级的数据计算需要消耗大量的内存,在内存中产生海量的Java对象。一旦出现Full GC,GC会达到秒级甚至分钟级,直接影响执行效率。
GC带来的中断会使集群中的心跳超时,导致节点被踢出集群,整个集群进入不稳定状态。虽然通过JVM参数调优可以提升回收效率,尽量减少Full GC的发生,但是仍然不能避免这个问题,精确的调优也确实非常困难。
(3)OOM问题影响稳定性
OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误,导致JVM崩溃,分布式框架的健壮性和性能都会受到影响。
(4)缓存未命中问题
CPU进行计算的时候,是从CPU缓存中获取数据,而不是直接从内存中获取数据。 CPU有分L1、L2、L3级缓存。L1小,一般为32KB,L3大,能达到32MB。缓存的理论基础是程序局部性原理,包括时间局部性和空间局部性:最近被CPU访问的数据,短期内CPU还要访问(时间);被CPU访问的数据附近的数据,CPU短期内还要访问(空间)。Java对象在堆上存储的时候并不是连续的,所以从内存中读取Java对象时,缓存的邻近的内存区域的数据往往不是CPU下一步计算所需要的,这就是缓冲未命中。此时CPU需要空转等待从内存中重新读取数据。CPU的速度和内存的速度之间差好几个数量级,导致CPU没有充分利用起来。如果数据没有在内存中,而是需要从磁盘上加载,那么执行效率就会变得惨不忍睹。
2.自主内存管理
因为直接使用JVM做内存管理在大数据场景下可能遇到的诸多问题,所以越来越多的大数据计算引擎选择自行管理JVM内存,如Spark、Flink、HBase,尽量达到C/C++一样的性能,同时避免OOM的发生。本章主要介绍Flink是如何解决上面的问题的,主要内容包括内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存等。
在Flink中,Java对象的有效信息被序列化为二进制数据流,在内存中连续存储,保存在预分配的内存块上,内存块叫做MemorySegment。MemorySegment是内存分配的最小单元,是一段固定长度的内存(默认大小为32KB)。同时,Flink为其提供了非常高效的读写方法,很多运算可以直接操作二进制数据,而不需要反序列化即可执行。
MemorySegment可以保存在堆上,其内部存储为一个Java byte数组,也可以保存在堆外的ByteBuffer中。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。
但使用堆上内存,仍然不是完全自主的内存管理,还存在以下问题:
1)超大内存(上百GB)JVM的启动需要很长时间,Full GC可以达到分钟级。使用堆外内存,可以将大量的数据保存在堆外,极大地减小堆内存,避免GC和内存溢出的问题。
2)高效的IO操作。堆外内存在写磁盘或网络传输时采用的是零拷贝,而堆上内存则至少需要1次内存复制。
3.堆外内存的不足之处
堆外内存提供了更好的性能和更可控的内存管理,但是也存在几个问题:
1)堆上内存的使用、监控、调试简单,堆外内存出现问题后的诊断则较为复杂。
2)Flink有时需要分配短生命周期的MemorySegment,在堆外内存上分配比在堆上内存开销更高。
3)在Flink的测试中,部分操作在堆外内存上会比堆上内存慢。
同时为了提供效率,Flink在计算中采用了DBMS的Sort和Join算法,直接操作二进制数据,避免数据反复序列化带来的开销。Flink的内部实现更像C/C++而非Java。
内存模型
内存布局
TaskManager是Flink中执行计算的核心组件,是用来运行用户代码的Java进程。其中,大量使用了堆外内存。
Flink TaskManager的简化和详细内存结构如下图所示:
简化内存模型:
详细内存模型:
基于文初提及的使用JVM堆上内存的一些不足之处,Flink设计了使用堆外内存的自主内存管理。因此,Flink任务进程的总内存(Total Process Memory, TPM)= Flink自身使用的内存(Total Flink Memory, TFM) + JVM运行额外的内存(如Metaspace、overhead)。其中,Flink自身使用的内存(TFM)包括了JVM堆内存和自主管理的堆外内存,堆外内存又包含了托管内存和直接内存。下面分别对这些分类进行介绍:
JVM Heap
Framework Heap
这部分内存主要由Flink框架自身使用,用于存储系统级别的数据结构,包括Flink框架在运行期间需要的一些数据结构,例如任务的线程栈内存和其他Flink框架的基础设施。例如用于JobManager和TaskManager的RPC消息、管理检查点的元数据等。它是作业执行所必需的基本内存,独立于用户程序和运行期间的数据存储。
Task Heap
这部分内存主要用于存储由用户函数创建的Java对象和用户函数操作的数据。例如,当执行一个map操作,您的函数可能会创建一些新的Java对象,这些对象都是在JVM堆内存中创建和管理的。如果Flink的托管内存配置为堆内,那么Flink的排序、哈希和状态后端操作也会使用到Task Heap内存。
Off-Heap
托管内存(Managed Memory)
托管内存是由 Flink 负责分配和管理的本地(堆外)内存。 以下场景需要使用托管内存:
- 流处理作业中用于 RocksDB State Backend。
- 流处理和批处理作业中用于排序、哈希表及缓存中间结果。
- 流处理和批处理作业中用于在 Python 进程中执行用户自定义函数。
更具体的,对于当作业中使用排序、哈希表及缓存中间结果时,Flink是如何使用托管内存的: - 排序:例如,当你需要对一个非常大的数据集进行排序时,如果数据无法完全装入内存,Flink 就会使用其托管内存来执行外排序。在外排序过程中,数据会被分割成可以装入内存的小块,每个小块内部进行排序,然后将排序后的小块写入磁盘。当所有小块都进行了排序和写入后,Flink 会从磁盘读取这些小块,执行归并排序,直到所有数据都被排序。
- 哈希表:Flink 在处理连接(Join)操作时,经常需要使用哈希表来维护到目前为止已经看到的数据记录。如果不能将所有数据装入内存,Flink 就会使用托管内存来存储这个哈希表。这样就可以保证即使在处理大规模数据时也能保持良好的性能。
- 缓存中间结果:在一些需要多遍扫描数据的算法(比如迭代算法)中,Flink 会缓存数据的中间结果,以便下一轮迭代可以重复使用,这样可以减少数据重复读取的开销。Flink 托管内存就是用来存储这些中间结果的。
另外,当使用 RocksDB 作为状态后端时,Flink 托管内存主要被用作 RocksDB 的写缓冲区(Write Buffer)和读缓存(Block Cache),从而提高状态访问的速度。
简言之,Flink 的托管内存主要用于存储在处理过程中需要存储的中间计算数据和结果,以求在充分利用有限内存资源的同时提供尽可能高的处理速度。
直接内存()
直接内存通常指的是被Flink进程直接从操作系统中申请的、不受Java堆内存垃圾回收器管理的内存。 以下场景需要使用直接内存:
- 于网络通信和文件I/O,
- 通过网络缓冲池进行数据交换(如shuffle)
- 数据缓冲以及序列化/反序列化中进行应用。
Flink通过直接内存技术进行数据交换可以有效避免频繁的Java堆内存和本地I/O缓存之间的数据复制(利用零拷贝技术),从而提高性能。
在一些情况下,直接内存也可以利用在某些需要大量内存并希望避免频繁触发垃圾回收的处理中,例如当使用RocksDB作为状态后端时,RocksDB的本地内存通常是由直接内存提供的,这样可以避免状态数据引起的Java堆内存的显著增加,从而降低了垃圾收集的开销和提高了性能。
直接内存包括了以下几部分:
- Framework Off-Heap:这部分内存被Flink框架用于框架自身的一些运行需求。比如,Flink的一些本地数据结构和算法可能会使用这部分内存进行操作。这部分内存一般不大。
- Task Off-Heap:这部分内存主要用于存储由用户任务产生的、并由Flink以某种形式管理的内存。比如,如果你配置了本地状态后端(如RocksDB)使用堆外内存,那么这部分内存将存储状态数据。这部分内存的使用可以避免引起频繁的Java GC操作,提高性能。
- Network:此部分内存主要用于网络通信中的缓冲区。Flink通过此缓冲区在TaskManager之间发送和接收数据。这部分内存通常是直接内存,不受GC的影响,可以有效地进行数据交换和缓冲以提高网络通信的性能。
另外,除了Flink使用的总内存(Total Flink Memory,TFM)外,总进程内存(Total Process Memory,TPM)还包括了JVM元空间(Metaspace)和其他开销内存(overhead)。在JVM内存模型中,将元空间从堆内存独立出来了,所以在上面的内存模型中也元空间也是单独一部分,外加一些JVM运行时的额外开销内存,例如线程栈、代码缓存、GC回收空间等等。
Flink内存模型分类配置参数
1.Flink使用的内存
(1)JVM堆上内存
- 框架堆上内存Framework Heap Memory。Flink框架本身所使用的内存,即TaskManager本身所占用的堆上内存,不计入slot的资源中。
配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。 - Task堆上内存Task Heap Memory。Task执行用户代码时所使用的堆上内存。
配置参数:taskmanager.memory.task.heap.size
(2)JVM堆外内存 - 框架堆外内存Framework Off-Heap Memory。Flink框架本身所使用的内存,即TaskManager本身所占用的堆外内存,不计入slot的资源。
配置参数:taskmanager.memory.framework.off-heap.size = 128MB,默认128MB。 - Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。
配置参数:taskmanager.memory.task.off-heap.size = 0,默认为0. - 网络缓冲内存Network Memory。网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区(Network Buffer,后面回介绍)。
配置参数:taskmanager.memory.network.(min/max/fraction),默认min=64MB,max=1GB,fraction=0.1。 - 堆外托管内存 Managed Memory。Flink管理的堆外内存。
配置参数:taskmanager.memory.managed.[size|fraction],默认fraction = 0.4。
2.JVM本身使用的内存
JVM本身直接使用了操作系统的内存。
- JVM元空间
JVM元空间所使用的内存
配置参数:taskmanager.memory.jvm-metaspace=96M,默认96MB。 - JVM执行开销
JVM在执行时自身所需要的内容,包括线程栈、IO、编译缓存等所使用的内存。
配置参数:taskmanager.memory.jvm-overhead.[min|max|fraction]。默认min=192MB,max=1GB,fraction=0.1。 - 总体内存
(1)Flink使用内存
综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size参数进行控制。
(2)进程使用内存
整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。
JVM内存控制参数如下所示:
1)JVM堆上内存,使用-Xmx和-Xms参数进行控制。
2)JVM直接内存,使用参数-XX:MaxDirectMemorySize进行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受该参数控制。
3)JVM Metaspace使用-XX:MaxMetaspaceSize进行控制。
内存计算
目前的实现中,在JVM启动之前就需要确定各个内存区块的大小。一旦JVM启动了,在TaskManager进程内部就不再重新计算。Flink中有两个地方进行内存大小计算:
- 在Standalone部署模式下,内存的计算在启动脚本中实现。
- 在容器环境下(Yarn、K8s、Mesos),计算在ResourceManager中进行。
在启动脚本与容器环境下的内存大小计算都调用了Flink的Java代码时间,保证了所有部署模式下的统一,计算好的参数使用-D参数提交给Java进程。
计算时,需要配置如下3个参数组合中的至少1个:
(1)Task的堆上内存和托管内存
如果手动配置了网络缓冲区内存大小,则使用该参数。如果没有明确配置,则使用分配系数fraction ✖️总体Flink使用内存计算网络缓冲区内存大小。
(2)总体Flink使用内存
如果配置了该选项,而没有配置(1),则从整体Flink内存中划分网络缓冲区内存和托管内存,剩余的内存作为Task堆上内存。
如果手动设置了网络缓冲内存,则使用其值,否则使用默认的分配系统fraction✖️总体Flink内存。
(3)总体进程使用内存
如果只配置了总体进程使用内存,则从整体进程中扣除JVM元空间和JVM执行开销内存,剩余的内存作为总体Flink使用内存。
内存数据结构
Flink的内存管理像操作系统管理内存一样,将内存划分为内存段、内存页等结构。
内存段
内存段在Flink内存叫做MemorySegment,是Flink的内存抽象的最小分配单元。 默认情况下,一个MemorySegment对应着一个32KB大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer)。
MemorySegment同时也提供了堆二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment内置了方法,可以直接返回或者写入数据,对于其他类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数据byte[]后写入。
MemorySegment结构
为了更清晰地理解MemorySegment,下面一起看一下MemorySegment的关键属性。
1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。
2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是就是Big Endian模式。
3)HeapMemory:如果MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),如果MemorySegment使用堆外内存,则为null。
4)address:字节数组对应的相对地址(若HeapMemory为null,即可能为堆外内存的绝对地址)。
5)addressLimit:标识地址结束位置(address+size)。
6)size:内存段的字节数。
结构类图如下所示:
可以看到,MemorySegment类定义了一系列的方法来对字节和基本数据类型,如int、long、float进行读写的方法(例如get和put方法)。同时,还支持批量操作,例如复制和比较操作,它提供了copy和compare方法用来对大量数据进行操作。
Flink的MemorySegment主要用于Flink框架对内存的管理和数据的处理,主要用在它的网络缓冲、排序算法和内存状态后端等地方,以提供高效的内存操作。在设计上,MemorySegment主要实现了以下几点:
- 高效的数据访问。 MemorySegment类包含了一个连续的字节数组(heapMemory),用于存储实际的二进制数据。所有的get和put操作都是在这个字节数组上执行的。使用连续的字节数组的好处正是我们上文提到的,可以充分利用程序的局部性原理,因此,Flink使用MemorySegment作为其最小的内存分配单元,保证了读写数据时,相邻的数据能够一起被加载到CPU缓存中,提升处理性能。
- 高效的内存管理。 Flink通过MemorySegment对内存进行管理,保证了Flink程序运行时的内存效率。例如,对于Flink的网络缓冲、排序算法和内存状态后端等地方,都会使用MemorySegment进行内存的分配和回收。这有助于Flink高效地使用内存,而且避免了一些Java内存管理中常见的问题,如垃圾收集(Garbage Collection)过频繁等。
- **支持堆外内存(off-heap memory)操作。**这意味着,除了在JVM堆内存上操作,MemorySegment还能直接在系统的物理内存上进行操作。使用离堆内存可以避免频繁的垃圾回收,提高数据处理的性能。
另外,需要注意的是,MemorySegment抽象类中的heapMemory仅适用于“堆”内存段,即哪些将数据存储在Java堆上的内存段。对于“非堆”内存段,即哪些将数据存储在Java堆之外的内存段,Flink使用java.nio.DirectByteBuffer的字节缓冲区(定义在HybridMemorySegment类中)来存储和操作数据。
最后再简要谈一下MemorySegment,如果觉得理解起来比较抽象的话,可以跟其他的一些数据结构类如ArrayList、LinkedList一起来对比理解,这些类都是定义的用来表示数据如何在内存中存储和管理的。ArrayList是划分一块连续的内存地址,LinkedList是用链表的结构来存储,而MemorySegment就是划分一块指定大小的连续内存地址来存储字节数据。上层的模块可以直接对MemorySegment进行操作,就相当于对平时对ArrayList、LinkedList这些结构的操作(比如插入、排序、比较等)。因此,MemorySegment就是Flink定义的一种数据结构,用来方便地存储、管理和操作内存数据。
字节顺序Big Endian和Little Endian
字节顺序是指字节类型的数据在内存中的存放顺序。不同的CPU架构体系使用不同的存储顺序。PowerPC系统采用Big Endian方式存储数据,低地址存放最高有效字节(MSB),而x86系列则采用Little Endian方式存储数据,低地址存放最低有效字节(LSB),如下图所示:
MemorySegment实现
Flink的MemorySegment有堆上和堆外两种实现,其类体系结构如图所示:
HeapMemorySegment用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。
之所以在后续的版本中只使用HybridMemorySegment,涉及了JIT编译优化的问题。如果同时使用了两个类,那么在运行的时候,每一次调用都需要去查询函数表,确定调用哪个子类中的方法,无法提前优化。但是如果只使用一个类,那么JIT编译时,自动识别方法的调用都可以被去虚拟化(de-virtualized)和内联(inlined),可以极大地提高性能。调用越频繁,优化效果就越好。
内存页
MemorySegment是Flink内存分配的最小单元,对于跨MemorySegment保存的数据,如果需要上层的使用者,需要考虑所有的细节,非常繁琐。所以Flink又抽象了一层,叫做内存页。内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。 有了这一层,上层使用者无需关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。
DataInputView
DataInputView是从MemorySegment数据读取的抽象视图,该视图可用于顺序读取内存的内容。继承自java.io.DataInput,提供了从二进制流中读取不同数据类型的方法。如下图所示:
InputView中持有多个MemorySegment的饮用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。DataInputView主要提供了一系列接口用于从数据输入流中读取数据,而MemorySegment则主要用于在连续的内存块上进行数据的低层次操作。
在Flink的网络缓冲,排序,哈希表等操作中,MemorySegment用作持有真实数据的内存块。而DataInputView则提供了读取这些数据的接口,以方便地从MemorySegment读取所需的数据。
在实际的数据编解码过程中,常常需要将DataInputView与MemorySegment一起使用。例如,一个典型的使用场景是在网络数据传输中,Flink会首先将数据保存在MemorySegment中,然后通过实现DataInputView的方式,来进行数据的读取和解码。
基本上所有的InputView实现类都继承了AbstractPageInputView抽象类,也就是所所有的InputView实现类都支持Page。
DataOutputView
DataOutputView是数据写入MemorySegment的抽象视图,继承自java.io.DataOutput,提供了将不同类型的数据写入二进制流的一系列方法。同样,DataOutputView中持有一个或者多个MemorySegment的引用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。
DataOutputView的接口继承关系如图所示:
在实际的数据编码和写入过程中,Flink通常会利用一个DataOutputView的实现将数据写入一个或多个MemorySegment。例如,在网络数据发送时,Flink会通过实现DataOutputView的方式,将数据写入MemorySegment,然后将这些MemorySegment添加到网络缓冲区以准备发送。
基本上所有的OutputView实现类都继承了AbstractPageOutputView抽象类,也就是说所有的OutputView实现类都支持跨MemorySegment写入。
内存页的使用
对内存的读取写入操作是非常底层的行为,对于上层应用(DataStream作业)而言,涉及向MemorySegment写入,读取二进制的地方都使用到了DataOutputView和DataInputView,而不是直接使用MemorySegment。
例如,在flink-table-runtime-blink中,BinaryRowSerializer中使用AbstractPagedInputView从MemorySegment中读取二进制数据并转换成BinaryRow,使用AbstractPagedOutputView将BinaryRow写入MemorySegment中。
Buffer
Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。Buffer接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。 Flink在各个TaskManager之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了1个MemorySegment,实际的数据就存储在这个MemorySegment中,并引入了一些额外的元数据,例如数据大小(currentSize属性)以及Buffer中包含的数据类型(dataType属性)等。
此外,Buffer还提供了内存的引用计数和递增/递减的方法,用于在资源回收时管理内存。
简单来说,Buffer是基于MemorySegment的,它在MemorySegment上增加了一些用于网络传输和内存管理的额外功能。
Buffer接口的类体系如图所示:
Buffer的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了“引用数”的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。
具体来说,在Apache Flink中,Buffer对象具有一个“引用数(Reference Count)”的属性,它是用来跟踪Buffer实例在系统中被多少组件引用的指标。每当一个组件获取对Buffer的引用时,引用数就会增加。当组件完成对Buffer的使用并且不再需要它时,就会减少引用数。
这种设计的目的是为了更好地管理和复用内存资源。当Buffer的引用数降为0,就表示没有任何组件再使用该Buffer,它的内存可以归还给MemorySegment池,以便其他组件复用。引用数在内存管理中是一种常见的机制,能够避免不必要的对象复制和频繁的内存分配和释放,在Flink的Buffer管理中起到了重要的作用。例如,在数据移交过程中,可能有多个线程或模块同时处理同一个Buffer,此时通过引用数可以准确判断什么时候可以安全地释放该Buffer。
NetworkBuffer同时继承了AbstractReferenceCountedByteBuf。
AbstractReferenceCountedByteBuf是Netty中的抽象类,通过继承该类,Flink中Buffer具备了引用计数的能力,并且实现了对MemorySegment的读写。感兴趣的读者可以去了解一下Netty。
Buffer资源池
Buffer资源池在Flink中叫做BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer通知等,其实现类是LocalBufferPool。
BufferPool的类体系如图所示:
为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供BufferPod的创建和销毁,其唯一的实现类是NetworkBufferPool。
每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferPool,为其分配内存。
NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,所以除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。
内存管理器
**MemoryManager是Flink中管理托管内存的组件,其管理的托管内存只使用堆外内存。**在批处理中用在排序、Hash表和中间结果的缓冲中,在流计算中作为RocksDBStateBackend的内存。
在Flink 1.10之前的版本中,MemoryManager负责TaskManager的所有内存。1.10版本中,MemoryManager的管理范围缩小为Slot级别,即为Task管理内容,TaskManager为每个Slot分配相同的内容,Task不能使用超过其Slot分配的资源。
MemoryManager主要通过内部接口MemoryPool来管理所有的MemorySegment。托管内存的管理相比于Network Buffers的管理更为简单,因为不需要Buffer的那一层封装。
内存申请
批处理计算任务中,MemorySegment负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer,代码如下所示,
# MemorySegmentFactory类
/**
* Allocates an off-heap unsafe memory and creates a new memory segment to represent that
* memory.
*
* <p>Creation of this segment schedules its memory freeing operation when its java wrapping
* object is about to be garbage collected, similar to {@link
* java.nio.DirectByteBuffer#DirectByteBuffer(int)}. The difference is that this memory
* allocation is out of option -XX:MaxDirectMemorySize limitation.
*
* @param size The size of the off-heap unsafe memory segment to allocate.
* @param owner The owner to associate with the off-heap unsafe memory segment.
* @param gcCleanupAction A custom action to run upon calling GC cleaner.
* @return A new memory segment, backed by off-heap unsafe memory.
*/
public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable gcCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
Runnable cleaner =
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);
// 在申请内存的时候,同时为该内存片段准备好内存清理
return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
}
使用Unsafe申请堆外内存,包装为ByteBuffer后再包装为MemorySegment。
流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache和WriterBufferManager参数来限制,参数的具体值从TaskManager的内存配置参数中计算而来。RocksDB自己来负责运行过程中的内存申请和内存释放,如下述代码所示:
/**
* Acquires a shared resource, identified by a type string. If the resource already exists, this
* returns a descriptor to the resource. If the resource does not yet exist, the method
* initializes a new resource using the initializer function and given size.
*
* <p>The resource opaque, meaning the memory manager does not understand its structure.
*
* <p>The OpaqueMemoryResource object returned from this method must be closed once not used any
* further. Once all acquisitions have closed the object, the resource itself is closed.
*/
public <T extends AutoCloseable> OpaqueMemoryResource<T> getExternalSharedMemoryResource(
String type, LongFunctionWithException<T, Exception> initializer, long numBytes)
throws Exception {
// This object identifies the lease in this request. It is used only to identify the release
// operation.
// Using the object to represent the lease is a bit nicer safer than just using a reference
// counter.
final Object leaseHolder = new Object();
final SharedResources.ResourceAndSize<T> resource =
sharedResources.getOrAllocateSharedResource(
type, leaseHolder, initializer, numBytes);
// 创建资源释放函数
final ThrowingRunnable<Exception> disposer =
() -> sharedResources.release(type, leaseHolder);
return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer);
}
内存释放
Flink自行管理内存,也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种:
- 内存使用完毕
- Task停止(正常或异常)执行。
在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放,提供了两个Java Cleaner。
LegacyCleanerProvider
该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sum.misc.Cleaner来释放内存。
Java9CleanerProvider
该CleanerProvider提供1.9及以上版本的JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来释放内存。
JavaGcCleanerWrapper会为每个Owner创建一个包含Cleaner的Runnable对象,在每个MemorySegment释放内存的时候,调用此Cleaner进行内存的释放。
当MemoryManager关闭的时候会对所有申请的MemorySegment进行释放,交还给操作系统。
网络缓冲器
网络缓冲器(Network Buffer)是网络交换数据的封装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,需要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的释放。
BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer。BufferConsumer位于下游Task中,负责从MemorySegment中读取数据。1个BufferBuilder对应1个BufferConsumer。
内存申请
LocalBufferPool的大小是动态的,在最小内存段数量与最大内存段数据之间浮动。使用NetworkBufferPool创建LocalBufferPool时,如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数量总和,则会发生错误。
Buffer申请
结果分区(ResultPartition)申请Buffer进行数据写入时,如下代码所示:
LocalBufferPool首先从自身持有的MemorySegment中分配可用的,如果没有可用的,则从TaskManager的NetworkBuffer中申请,如果没有,则阻塞等待可用的MemorySegment,如下代码所示:
MemorySegment申请
申请Buffer本质上来说就是申请MemorySegment,如果在LocalBufferPool中,则申请新的堆外内存MemorySegment,如下代码若是:
内存回收
Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数+1,每个Buffer被消费完了,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。
Buffer回收
前面介绍过Buffer的主要实现类是NetworkBuffer,同时继承了AbstractReferenceCountedByteBuf.。当Buffer被消费一次后,就会对Buffer的引用计数-1,如下代码所示:
Buffer回收之后,并不会释放MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager的级别内存不足,才会释放回TaskManager持有的全局资源池。
释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。
MemorySegment释放
当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。
总结
大数据场景下,使用Java的内存管理会带来一系列的问题,所以Flink从一开始就选择自主内存管理。为了实现内存管理,Flink对内存进行了一系列的抽象,内存段MemorySegment是最小的内存分配单元,对于跨段的内存访问,Flink抽象了DataInputView和DataOutputView,可以看作是内存页。
Flink在1.10版本重构了其TaskManager的内存管理模型,主要分为堆上内存和堆外内存,并简化了内存参数。在计算层面上,Flink的内存管理器提供了对内存的申请和释放,在数据传输层面上,Flink抽象了网络内存缓存Buffer(1个Buffer对应一个MemorySegment)的申请和释放。