Apache Flink
Apache Flink
- 1 Flink的特点
- 2 流式计算架构
- 2.1 Lambda架构
- 2.2 Kappa架构
- 2.3 IOTA架构
- 3 Flink部署
- 3.1 Standalone模式
- 3.2 Standalone-HA模式
- 3.3 Flink on Yarn模式
- 3.3.1 Session mode
- 3.3.2 Per-Job mode
- 3.3.3 Application mode
- 4 Flink运行时的组件
- 4.1 JobManager(作业管理器)
- 4.1.1 ResourceManager(资源管理器)
- 4.1.2 Dispatcher(分发器)
- 4.1.3 JobMaster
- 4.2 TaskManager(任务管理器)
- 5 Flink任务提交流程
- 5.1 Standalone模式
- 5.2 Flink on Yarn模式
- 6 Flink的重要概念
- 6.1 DataFlow逻辑数据流
- 6.2 ExecutionGraph执行图
- 6.2 TaskSlot任务槽
- 6.3 Slot Sharing槽共享
- 6.4 Parallelism并行度
- 6.5 Task
- 6.6 SubTask
- 6.7 Partition
- 6.8 数据传输形式
- 6.8.1 One-to-one
- 6.8.2 Redistributing
- 6.9 Operator Chain
- 7 Transformation
- 7.1 基本操作map、flatMap、filter
- 7.2 滚动聚合、reduce聚合
- 7.3 分流操作
- 7.4 分区和自定义分区
- 8 Flink四大基石
- 8.1 Window窗口
- 8.1.1 Window窗口的应用场景
- 8.1.2 时间窗口(Time Winodow)
- 8.1.2.1 滚动时间窗口
- 8.1.2.2 滑动时间窗口
- 8.1.2.3 会话窗口
- 8.1.3 计数窗口(Count Window)
- 8.1.3.1 滚动计数窗口
- 8.1.3.2 滑动计数窗口
- 8.2 Time
- 8.2.1 Flink中的时间语义
- 8.2.2 Watermark(水位线)
- 8.2.2.1 Watermark的特点
- 8.2.2.2 Watermark的传递
- 8.2.2.3 Watermark示例
- 8.3 State
- 8.3.1 Operatior State(算子状态)
- 8.3.2 Keyed State(键控状态)
- 8.3.2.1 Value state(值状态)
- 8.3.2.2 List state(列表状态)
- 8.3.2.3 Map state(映射状态)
- 8.3.2.4 Reducing state(聚合状态)
- 8.3.2.5 Aggregating state(聚合状态)
- 8.3.2.6 状态有效期(TTL)
- 8.4 CheckPoint
- 8.4.1 State Backends(状态后端)
- ~~8.4.1.1 MemoryStateBackend~~
- ~~8.4.1.2 FsStateBackend~~
- 8.4.1.3 EmbeddedRocksDBStateBackend
- 8.4.1.4 HashMapStateBackend
- 8.4.2 Checkpoint使用方式
- 8.4.3 Checkpoint执行原理
- 8.4.3.1 Stream barrier
- 8.4.3.2 对齐检查点
- 8.4.3.3 未对齐的检查点
- 8.4.4 自动重启策略
- 8.4.4.1 Fixed Delay Restart Strategy(固定延时重启策略)
- 8.4.4.2 Failure Rate Restart Strategy(故障率重启策略)
- 8.4.4.3 No Restart Strategy(无重启策略)
- 8.4.4.4 Fallback Restart Strategy(群集定义的重启策略)
- 8.4.5 保存点(Savepoints)
- 8.4.5.1 Savepoint和Checkpoint的区别。
- 8.4.5.2 Savepoint相关命令
- 8.4.6 状态一致性
- 8.4.7 状态一致性分类
- 8.4.7.1 AT-MOST-ONCE(最多一次)
- 8.4.7.2 AT-LEAST-ONCE(至少一次)
- 8.4.7.3 EXACTLY-ONCE(精确一次)
- 8.4.8 端到端精确一次保证
- 8.4.8.1 Source
- 8.4.8.2 Sink
- 8.4.8.3 两阶段提交对外部Sink系统的要求
- 8.4.8.4 两阶段提交Checkpoint执行流程Sink的相关操作
- 9 CEP(Complex Event Processing)
- 9.1 模式API
- 9.1.1 单个模式
- 9.1.1.1 模式操作where(condition)
- 9.1.1.2 模式操作or(condition)
- 9.1.1.3 模式操作until(condition)
- 9.1.1.4 模式操作subtype(subClass)
- 9.1.1.5 模式操作oneOrMore()
- 9.1.1.6 模式操作timesOrMore(#times)
- 9.1.1.7 模式操作times(#ofTimes)
- 9.1.1.8 模式操作times(#fromTimes, #toTimes)
- 9.1.1.9 模式操作optional()
- 9.1.1.10 模式操作greedy()
- 9.1.2 组合模式
- 9.1.2.1 严格连续next()
- 9.1.2.2 松散连续followedBy()
- 9.1.2.3 不确定的松散连续followedByAny()
- 9.1.2.4 严格连续的NOT模式notNext()
- 9.1.2.5 松散连续的NOT模式notFollowedBy()
- 9.1.3 模式组
- 9.2 从模式中选取
- 9.3 处理超时的部分匹配
- 9.4 CEP库中的时间
- 10 双流Join
- 10.1 union
- 10.2 connect
- 10.2.1 和普通流连接
- 10.2.2 和广播流连接
- 10.3 coGroup
- 10.4 Window Join
- 10.4.1 Tumbling Window Join
- 10.4.2 Sliding Window Join
- 10.4.3 Session Window Join
- 10.5 Interval Join
- 11 Table API & SQL
- 12 资源配置调优
- 12.1 配置TaskManager内存
- 12.1.1 进程总内存(Total Process Memory)
- 12.1.2 Flink总内存(Total Flink Memory)
- 12.1.3 框架堆内存(Framework Heap Memory)
- 12.1.4 任务堆内存(Task Heap Memory)
- 12.1.5 托管内存(Managed Memory)
- 12.1.6 框架堆外内存(Framework Off-heap Memory)
- 12.1.7 任务堆外内存(Task Off-heap Memory)
- 12.1.8 网络内存(Network Memory)
- 12.1.9 JVM Metaspace
- 12.1.10 JVM开销
- 12.2 并行度设置
- 13 RocksDB调优
- 13.1 开启State访问性能监控
- 13.2 开启本地恢复
- 13.3 调整预定义选项
- 13.4 开启分区索引功能
- 14 反压处理
- 14.1 反压的理解
- 14.2 反压的危害
- 14.3 定位反压节点
- 14.3.1 利用Web UI定位
- 14.3.2 利用Metrics定位
- 14.4 反压的解决方案
- 14.4.1 查看是否数据倾斜
- 14.4.2 使用火焰图分析
- 14.4.3 分析GC情况
- 15 数据倾斜
- 15.1 keyBy后的聚合操作存在数据倾斜
- 15.2 keyBy之前发生数据倾斜
- 15.3 keyBy后的窗口聚合操作存在数据倾斜
- 16 Job优化
- 16.1 链路延迟测量
- 16.2 开启对象重用
- 16.3 细粒度滑动窗口优化
- 17 Flink SQL调优
- 18 网络缓冲调优
- 18.1 概述
- 18.2 缓冲消胀机制
- 19 常见报错
- 19.1 非法配置异常
- 19.2 Java堆空间异常
- 19.3 直接缓冲存储器异常
- 19.4 元空间异常
- 19.5 网络缓冲区数量不足
- 19.6 Kafka动态发现分区
- 19.7 超出文件描述符限制
- 19.8 脏数据导致数据转发失败
- 19.9 通讯超时
1 Flink的特点
1. 支持Scala和Java API。
2. 支持批流一体。
3. 同时支持高吞吐、低延迟、高性能。
4. 支持事件时间和处理时间语义,基于事件时间语义能够针对无序事件提供精确、一致的结果;基于处理时间语义能够用在具有极低延迟需求中。
5. 支持不同时间语义下的窗口编程。
6. 支持具有Backpressure功能的持续流模型。
7. 提供精确一次(Exactly Once)的状态一致性保障。
8. Flink在JVM内部实现了自己的内存管理。
9. 基于轻量级的分布式快照CheckPoint的容错。
10. 支持SavePoint机制,手工触发,适用于升级。
11. 支持高可用配置(无单点失效),与k8s、Yarn、Apache Mesos紧密集成。
12. 提供常见存储系统的连接器,Kafka、Elasticsearch等。
13. 提供详细、可自由定制的系统及应用指标集合,用于提前定位和响应问题。
14. 支持复杂事件处理CEP。
2 流式计算架构
2.1 Lambda架构
数据从数据源被采集到大数据平台,在大数据平台中分成2条线:一条线进入流式计算,去计算一些实时指标。另一条线进入离线计算部分,去计算T+1的指标。
2.2 Kappa架构
LinkenIn提出的Kappa架构解决了之前Lambda架构需要维护实时和离线两套程序的问题。即数据进入到大数据平台,离线和实时都使用同一套框架代码(实时的指标就实时处理就完了, 离线的指标就可以看作实时做最近一段时间的数据聚合,所以Kappa架构的离线计算时间跨度不能太长)。
2.3 IOTA架构
为了应对多数据源复杂数据和多应用场景复杂需求,有人提出了面向未来的IOTA架构。
核心思路是:使用公共的数据模型来统一实时和离线计算(借助cache和history,空间换时间,来让查询变的更快速)。未来数据查询趋势就是即席查询,就是不管你查的是实时指标还是离线指标,都希望能够很快速的看到结果。
3 Flink部署
3.1 Standalone模式
本文所使用的版本为1.14.6。官网下载。
# 上传下载包到Linux服务器上
# 解压
tar -zxvf flink-1.14.6-bin-scala_2.12.tgz -C /opt
# 修改配置文件
vim /opt/flink-1.14.6/conf/flink-conf.yaml
# 以下所有配置只在Standalone模式下有用。
# 配置JobManager的rpc通信地址。必须指定一个具体的IP或者主机名,方便其他机器的TaskManager访问,如果直接指定localhost,则其他机器的TaskManager不能和JobManger进行访问
jobmanager.rpc.address: localhost
# 配置JobManager的rpc通信端口。
jobmanager.rpc.port: 6123
# JobManager的总内存大小,包括JVM的元空间以及其他的一些外部内存。
jobmanager.memory.process.size: 1600m
# TaskManager的总内存大小,包括JVM的元空间以及其他一些存储状态的外部内存。
taskmanager.memory.process.size: 1728m
# 每个TaskManager所包含的slots的个数,也就是一个TaskManager可以并行执行的任务个数,一个任务所占用的slots个数为并行度最大的那个算子的个数。
taskmanager.numberOfTaskSlots: 1
# 用于指定程序运行的默认并行度,默认并行度的数量必须小于(TaskManager的个数*每个TaskManager的slots个数)。
parallelism.default: 1
# JobManager所运行的机器,可以保持不变也可以配置成指定的IP
vim /opt/flink-1.14.6/conf/masters
localhost:8081
# TaskManager所运行的机器,TaskManager有多少机器就配置多少个机器的IP
vim /opt/flink-1.14.6/conf/workers
192.168.130.12
192.168.130.13
192.168.130.14
### 注意:在Standalone模式下,启动集群的机器必须是配置JobManager的机器。
## 把Flink目录分发到其他机器上。
### 启动集群
/opt/flink-1.14.6/bin/start-cluster.sh
### 启动之后通过jps可以查看到StandaloneSessionClusterEntrypoint和TaskManagerRunner进程
### 启动集群之后可以在浏览器通过http://JobManager部署机器主机名或者IP地址:8081/#/overview访问
### 停止集群
/opt/flink-1.14.6/bin/stop-cluster.sh
### 查看提交到Standalone模式下的正在运行的任务情况
/opt/flink-1.14.6/bin/flink list
/opt/flink-1.14.6/bin/flink list -r
### 查看提交到Standalone模式下所有的任务情况
/opt/flink-1.14.6/bin/flink list -a
### 查看提交到Standalone模式下关闭的任务情况
/opt/flink-1.14.6/bin/flink list -s
### 停止提交到Standalone模式下的任务
/opt/flink-1.14.6/bin/flink cancel JOB_ID
### Standalone模式下提交任务
./flink run -c <classname> -p <parallelism> -d -s <Savepoint or Checkpoint 目录> -n <jar包位置> <入口类参数>
-d # 后台运行
-n # 可以跳过通过checkpoint或者savepoint恢复任务无法映射到新程序的状态。
3.2 Standalone-HA模式
### 在Standalone模式的基础上在flink-conf.yaml配置文件中增加如下内容。
#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node01:8020/flink-checkpoints
# 使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs:///flink/ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: localhost:2181
# 默认是open,如果 zookeeper security 启用了更改成 creator
high-availability.zookeeper.client.acl: open
# 用于启用/禁用增量checkpoints的标志,RocksDB state backend就支持增量checkpoint
state.backend.incremental: false
### 因为Standalone-HA使用了HDFS保存JobManager的元数据,那么集群中就必须有HDFS服务,通过配置echo ${HADOOP_CLASSPATH}命令查看是否能输出Hadoop相关的依赖,如果不能的话,在/etc/profile配置如下并加载
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile
### 在masters配置文件中增加副节点的地址
vim /opt/flink-1.14.6/conf/masters
主节点:8081
副节点:8081
### 修改副节点的flink-conf.yaml配置文件jobmanager.rpc.address
vim /opt/flink-1.14.6/conf/masters
jobmanager.rpc.address: 副节点
### 后续的操作和Standalone模式相同
3.3 Flink on Yarn模式
以Yarn模式部署Flink任务时,Hadoop环境版本建议在2.5.0以上。
为什么使用Flink on Yarn模式呢?
1. Yarn的资源可以按需使用,提高集群的资源利用率。
2. Yarn的任务有优先级,根据优先级运行作业。
3. JobManager和TaskManager进程都由Yarn NodeManager监控。如果JobManager进程异常退出,则Yarn ResourceManager会重新调度JobManager到其他机器。如果TaskManager进程异常退出,JobManager会受到消息并重新向Yarn ResourceManager申请资源,重新启动TaskManager。
Flink提供了三种在Yarn上运行的模式,分别为Session mode、Per-Job mode以及Application mode。
3.3.1 Session mode
会话模式会先在Yarn集群中启动一个Flink集群,并重复使用该集群。在同一集群中执行的应用程序使用相同的资源,因此会竞争相同的资源。这样做的好处是,您不必为每个提交的作业支付启动整个集群的资源开销。但是,如果其中一个作业行为不当或导致TaskManager关闭,那么在该TaskManager上运行的所有作业都将受到故障的影响。这除了会对导致故障的作业产生负面影响外,还意味着潜在的大规模恢复过程,所有重新启动的作业都会并发地访问文件系统,并使其对其他服务不可用。此外,让一个集群运行多个作业意味着JobManager的负载更大,因为它负责记录集群中的所有作业。在Flink社区是不推荐这种部署方式的。
# 在老版本中需要提供Hadoop的依赖包放在Flink的lib目录下,现在只需要在命令行输入echo ${HADOOP_CLASSPATH}命令检查是否有Hadoop的依赖包,如果没有需要使用如下命令配置
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile
# Flink提交任务到Yarn上后需要把相关的依赖包上传到HDFS上,可以提前把这些依赖包上传然后提交任务的时候通过-Dyarn.provided.lib.dirs参数指定jar包的位置,多个目录之间以逗号分割。这样所有作业就不必各自上传依赖,可以直接从HDFS拉取,并且YARN NodeManager也会缓存这些依赖,进一步加快作业的提交过程。
# 提交任务
# 需要事先申请资源
bin/yarn-session.sh -s <TaskSlots> -tm <每个TaskManager的内存(单位:MB)> -jm <JobManager的内存> -nm <Yarn的appName> -d
### 提交任务的命令
./flink run \
-t yarn-session \
-d \ # 分离模式,后台执行
-p <parallelism> \ # 并行度
-Dclassloader.check-leaked-classloader=false \ # 如果这个类加载器阻止了任务的运行,则禁用它
-Dyarn.application.id=<Yarn-application-id> \ # 指定启动的Session集群的Yarn-id
-Dyarn.provided.lib.dirs="" \ # 上传到HDFS上的Flink依赖包以及Yarn的依赖包。
-Dexecution.savepoint.ignore-unclaimed-state=true \ #可以跳过通过checkpoint或者savepoint恢复任务无法映射到新程序的状态。
-c <className> \
<Jar包位置>
# session mode 的每个TaskManager的内存以及slots和JobManager的内存在启动session mode集群的时候指定,在提交任务的时候修改不了。
# 停止yarn-session
yarn application -kill application_xxx
3.3.2 Per-Job mode
为了提供更好的资源隔离保证,Per-Job模式使用可用的资源提供程序框架(例如YARN, Kubernetes)为每个提交的作业启动一个集群。此集群仅对该作业可用。当作业完成时,集群将被删除,任何遗留的资源(文件等)都将被清除。这提供了更好的资源隔离,因为行为不端的作业只能关闭自己的任务管理器。此外,它将负载分散到多个JobManager中,因为每个作业有一个JobManager。由于这些原因,Per-Job资源分配模型是许多生产原因的首选模式。
# 在老版本中需要提供Hadoop的依赖包放在Flink的lib目录下,现在只需要在命令行输入echo ${HADOOP_CLASSPATH}命令检查是否有Hadoop的依赖包,如果没有需要使用如下命令配置
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile
# Flink提交任务到Yarn上后需要把相关的依赖包上传到HDFS上,可以提前把这些依赖包上传然后提交任务的时候通过-Dyarn.provided.lib.dirs参数指定jar包的位置,多个目录之间以逗号分割。这样所有作业就不必各自上传依赖,可以直接从HDFS拉取,并且YARN NodeManager也会缓存这些依赖,进一步加快作业的提交过程。
# 提交任务命令
bin/flink run \
-t yarn-per-job \
-d \ # 后台运行
-Dyarn.provided.lib.dirs="" \ # 上传到HDFS上的Flink依赖包以及Yarn的依赖包。
-Dclassloader.check-leaked-classloader=false \ # 如果这个类加载器阻止了任务的运行,则禁用它
-Djobmanager.memory.process.size=2048mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dyarn.application.name=<Yarn的appName> \
-Dexecution.savepoint.ignore-unclaimed-state=true \ #可以跳过通过checkpoint或者savepoint恢复任务无法映射到新程序的状态。
-Dexecution.savepoint.path="" \ # Savepoint or Checkpoint 目录
-Dpipeline.operator-chaining=false \ # 禁用operator chain
-p <parallelism> \
-c <className> \
<Jar包位置>
3.3.3 Application mode
在所有其他模式中,应用程序的main()方法在客户端执行。这个过程包括在本地下载应用程序的依赖项,执行main()来生成Flink运行时可以理解的应用程序的表示(即JobGraph),并将依赖项和JobGraph发送到JobManager。这使得客户端成为一个沉重的资源消费者,因为它可能需要大量的网络带宽来下载依赖项并将二进制文件发送到集群,并且需要CPU周期来执行main()。当跨用户共享客户机时,这个问题会更加明显。
基于这个原因,Application Mode诞生了,应用程序的main()方法在JobManager上执行。该会话集群仅在特定应用程序的作业之间共享,并在应用程序完成时关闭。使用这种体系结构,应用程序模式提供了与Per-Job模式相同的资源隔离和负载平衡保证,但是是在整个应用程序的粒度上。在JobManager上执行main()可以节省所需的CPU周期,还可以节省在本地下载依赖项所需的带宽。此外,由于每个应用程序有一个JobManager,因此它允许更均匀地分散网络负载,以便下载集群中应用程序的依赖项。
# 在老版本中需要提供Hadoop的依赖包放在Flink的lib目录下,现在只需要在命令行输入echo ${HADOOP_CLASSPATH}命令检查是否有Hadoop的依赖包,如果没有需要使用如下命令配置
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile
# Flink提交任务到Yarn上后需要把相关的依赖包上传到HDFS上,可以提前把这些依赖包上传然后提交任务的时候通过-Dyarn.provided.lib.dirs参数指定jar包的位置,多个目录之间以逗号分割。这样所有作业就不必各自上传依赖,可以直接从HDFS拉取,并且YARN NodeManager也会缓存这些依赖,进一步加快作业的提交过程。
# 提交任务命令
bin/flink run-application \
-t yarn-application \
-d \ # 后台运行
-Dclassloader.check-leaked-classloader=false \ # 如果这个类加载器阻止了任务的运行,则禁用它
-Dyarn.provided.lib.dirs="" \ # 上传到HDFS上的Flink依赖包以及Yarn的依赖包。
-Djobmanager.memory.process.size=2048mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dyarn.application.name=<Yarn的appName> \
-Dexecution.savepoint.ignore-unclaimed-state=true \ #可以跳过通过checkpoint或者savepoint恢复任务无法映射到新程序的状态。
-Dexecution.savepoint.path="" \ # Savepoint or Checkpoint 目录
-Dpipeline.operator-chaining=false \ # 禁用operator chain
-p <parallelism> \
-c <className> \
<Jar包位置>
4 Flink运行时的组件
4.1 JobManager(作业管理器)
1. 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
2. JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(Logical DataFlow Graph)和打包了所有的类库和其他资源的Jar包。
3. JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做执行图(ExecutionGraph),包含了所有可以并发执行的任务。
4. JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它的的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(CheckPoint)的协调。
4.1.1 ResourceManager(资源管理器)
1. 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManager插槽是Flink中定义的处理资源单元。
2. Flink为不同的环境和资源提供者实现了对应的ResourceManager,在Standalone设置中,ResourceManager只能分配可用TaskManager的 slots,而不能自行启动新的TaskManager。
3. 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
4.1.2 Dispatcher(分发器)
1. 可以跨作业运行,它为应用提交提供了Rest接口。并为每个提交的作业启动一个新的JobMaster。
2. 当一个应用被提交执行时,分发器就会启动并将应用转交给一个JobManager。
3. Dispatcher也会启动一个Web UI,用来方便得展示和监控作业执行的信息。
4. Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
4.1.3 JobMaster
JobMaster负责管理单个JobGraph的执行。Flink集群中可以同时运行多个作业,每个作业都有自己的JobMaster。
4.2 TaskManager(任务管理器)
1. Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
2. 启动之后,TaskManager会向资源管理器注册它的插槽,收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
3. 在执行过程中,一个TaskManager可以跟其它运行同一个应用程序的TaskManager交换数据。
5 Flink任务提交流程
5.1 Standalone模式
5.2 Flink on Yarn模式
6 Flink的重要概念
6.1 DataFlow逻辑数据流
Flink上运行的程序会被映射成逻辑数据流(DataFlow),它包含了三部分,Source、Transformations、Sink。每一个DataFlow以一个或多个Source开始以一个或多个Sink结束。DataFlow类似于任意的有向无环图(DAG)。
6.2 ExecutionGraph执行图
1. StreamGraph是根据用户通过Stream API编写的代码生成的最初的图,用来表示程序的拓扑结构。
2. JobGraph是StreamGraph经过优化后生成的图,提交给JobManager的数据结构。主要的优化为将多个符合条件的节点chain在一起作为一个节点。
3. ExecutionGraph是JobManager根据JobGraph生成的。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
4. JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的图,并不是一个具体的数据结构。
6.2 TaskSlot任务槽
Flink通过TaskSlot的数量来控制TaskManager上可以最多并行执行的任务数量。TaskSlot指TaskManager上可以同时运行的线程数(并行度)。Task Slot是静态的概念,是指TaskManager具有的并发执行能力。
6.3 Slot Sharing槽共享
一个TaskSlot上并不是只能运行一个Operator或者Operator Chain,在没有指定各个算子的槽位名称(可以用过调用slotSharingGroup()方法指定)的时候,如果某个TaskSlot中的某个Operator或者Operator Chain的SubTask执行完成时,该TaskSlot可以共享给后续的其他Operator或Operator Chain的SubTask使用。
如果某个算子改变了共享槽位的名称后,这个算子后面的算子没有在设置共享槽位的名称,那么这两个算子的槽位名称相同,槽位名称不同的SubTask不能同在一个槽位执行。
6.4 Parallelism并行度
Flink中的并行度可以设置全局的,也可以给每个算子设置不同的并行度。对于每个算子来说,并行度就是这个算子的操作可以同时执行的个数。
6.5 Task
每个Operator Chain或无法合并的单个Operator叫做Task。
6.6 SubTask
一个Operator Chain或无法合并的单个Operator因为所设置的并行度而具有多个并行的Task。每一个Task对于这个算子或算子链来说就是一个SubTask。
6.7 Partition
分区是整个数据流或数据集的独立子集。通过将每条记录分配给一个或多个分区,来把数据流或数据集划分为多个分区。在运行期间,Task 会消费数据流或数据集的分区。改变数据流或数据集分区方式的转换通常称为重分区。
6.8 数据传输形式
6.8.1 One-to-one
Stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟source算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
6.8.2 Redistributing
Stream的分区会发生改变。每一个算子的子任务依据所选择的Transformation发送数据到不同的目标任务。例如,keyBy基于hashCode重分区、而broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。
6.9 Operator Chain
Flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(Local Forward)的方式进行连接。并行度相同、是one-to-one操作以及共享槽位名称相同,则这两个算子可以链接起来。
7 Transformation
7.1 基本操作map、flatMap、filter
// TODO 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO map
dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}).print();
//TODO flatMap
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
}).print();
//TODO filter
dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return !value.contains("TMD");//如果是字符串包含TMD则表示过滤掉
}
}).print();
7.2 滚动聚合、reduce聚合
//TODO sum、min、minBy、max、maxBy
//TODO minBy()和min()以及max和maxBy的区别就是:在一条记录中max()和min()除去要统计的字段,其他字段的值都是第一条记录的值,而maxBy()和minBy()其他字段的值都是当前最大值或最小值那条记录的值。
dataStream.keyBy(x->x.getId())
.sum("temperature")
.print();
//TODO reduce
dataStream.keyBy(x->x.getId())
.reduce(new ReduceFunction<SensorReading>() {
/**
* @param value1 value1是前两条记录聚合后的数据
* @param value2 value2是最新的一条数据
*/
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
}).print();
7.3 分流操作
//TODO 这种分流操作使用了侧输出流的方式。
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//需求:对流中的数据按照奇数和偶数拆分并选择
OutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
OutputTag<Integer> evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class));
SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
DataStream<Integer> oddResult = result.getSideOutput(oddTag);
DataStream<Integer> evenResult = result.getSideOutput(evenTag);
7.4 分区和自定义分区
//TODO global 所有的数据全部发送到下一个Operator的第一个SubTask上,相当于丢弃了分布式计算的优势。
dataStream.global();
//TODO broadcast 广播形式,所有的数据发送到下一个Operator的每一个SubTask上,
dataStream.broadcast();
//TODO shuffle 随机分区
dataStream.shuffle();
//TODO rebalance 轮询分区
dataStream.rebalance();
//TODO recale 本地轮流分配
dataStream.recale();
//TODO keyBy 按照Key的hash值取模分区
dataStream.keyBy(x->x.getId())
//TODO 自定义分区
dataStream.pariotionCustom(new MyPartitioner(),0)
8 Flink四大基石
8.1 Window窗口
8.1.1 Window窗口的应用场景
在实际开发中,如果出现每天、每小时等的统计任务,这些需要就需要用到Window窗口。
8.1.2 时间窗口(Time Winodow)
8.1.2.1 滚动时间窗口
// TODO 将数据依据固定的窗口长度对数据进行划分。时间对齐,窗口长度固定,没有重叠数据。
// TODO 开窗之后的函数有两类,一类为增量聚合函数有:sum()、min()、minBy()、max()、maxBy()、reduce()、aggregate()。另一类为全窗口函数,会先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。process(new ProcessWindowFunction())、apply(new WindowFunction())
dataStream.keyBy(x->x.getId())
//.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
// 当需求是统计一天的数据的时候,开窗时间为一天,但是必须指定偏移量,并且因为开窗后只有在开窗结束的时候才会返回统计的结果,可以指定触发器,这里的触发器10秒触发一次统计结果。
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//基于处理时间的滚动时间窗口,
.sum("count")
.print();
//输入 累加器 输出
new AggregateFunction<SensorReading, Integer, Integer>() {
//初始化累加器
@Override
public Integer createAccumulator() {
return 0;
}
//根据需求来一条数据更新累加器的数据
@Override
public Integer add(SensorReading value, Integer accumulator) {
return accumulator + 1;
}
// 返回累加器的结果
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
//合并两个累加器的值
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
}
8.1.2.2 滑动时间窗口
// TODO 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。窗口长度固定,数据可以有重叠。
dataStream.keyBy(x->x.getId())
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))//基于处理时间的滑动时间窗口,窗口长度为10秒,滑动步长为5秒,那么会有5秒的数据重复,一条数据所能同时存在的窗口等于窗口长度和滑动步长的除数。
.sum("count")
.print();
8.1.2.3 会话窗口
// TODO 由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口。时间不对齐。
dataStream.keyBy(x->x.getId())
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))//基于处理时间的会话窗口,如果10秒钟没有数据到来,后续到来的数据会重新开一个窗口。
.sum("count")
.print();
8.1.3 计数窗口(Count Window)
使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。
8.1.3.1 滚动计数窗口
dataStream.keyBy(x->x.getId())
.countWindow(10)//滚动计数窗口,10条记录为一个窗口
.sum("count")
.print();
8.1.3.2 滑动计数窗口
dataStream.keyBy(x->x.getId())
.countWindow(10,5)//滑动计数窗口,10条记录为一个窗口,滑动步长为5条记录,
.sum("count")
.print();
8.2 Time
8.2.1 Flink中的时间语义
1. Event Time 事件创建的时间。
2. Ingestion Time 摄入时间,数据进入Flink的时间。
3. Processing Time 处理时间,执行操作算子的本地系统时间,与机器无关。
在实际开发中,一般以事件时间作为时间统计的标准,但是因为网络等因素,可能会让数据乱序到达。在Flink中使用Watermarker来处理事件时间。
8.2.2 Watermark(水位线)
8.2.2.1 Watermark的特点
1. 在Flink中Warkermark是一条特殊的数据记录,只是一个时间戳。
2. Watermark是一种衡量Event Time进展的机制,可以设定延迟触发。
3. Watermark是用于处理乱序数据的,而正确的处理乱序数据,通常用Watermark机制结合Window来实现。
4. 数据流中的Watermark用于表示timestamp小于Watermark的数据都已经到达了。因此,Window的执行也是由Watermark触发的。
5. Watermark用来让程序员自己平衡延迟和结果的正确性。
6. Watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark有两种生成方式,一种为每条数据都生成一条Watermark数据插入到数据中。另一种为间隔一段时间生成一条Watermark数据插入到数据中。选择那种方式根据数据记录的稀疏程度来确定。默认为间隔一段时间。
7. Watermark与数据中的时间戳字段有关。
8. Watermark = 当前进来的数据最大的事件时间 - 最大允许的数据延迟时间。
8.2.2.2 Watermark的传递
在分区操作中,每个分区的Watermark都来了之后会拿一个最小的Watermark向下游分区传递。如果读取源数据和生成Watermark之间经过了重分区操作。那么后续的开窗操作的触发计算就会受到WaterMark的影响。所以Watermark应该是越靠近源数据越好。
8.2.2.3 Watermark示例
OutputTag<SensorReading> outputTag = new OutputTag("传感器温度信息", BasicTypeInfo.of(SensorReading.class));
SingleOutputStreamOperator<SensorReading> result = stream.map(x -> {
String[] arr = x.split(",");
return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));
}).assignTimestampsAndWatermarks(
WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withIdleness(Duration.ofMinutes(1));//在多数据源或者多分区的时候,如果某个分区的数据没有了,其他分区的数据照常来,按照barriers对齐的原则,这时候的Watermark就不增长了,可以使用这个如果一分钟数据没有到来,则标记这个分区或者数据源为空闲数据源。
.withTimestampAssigner((x, y) -> x.getTimestamp()*1000)) //TODO 指定的时间戳必须是毫秒级的。forBoundedOutOfOrderness 是处理乱序的方法,需要指定最大乱序时间,
/*WatermarkStrategy.<SensorReading>forMonotonousTimestamps()
.withTimestampAssigner((x,y)->x.getTimestamp()*1000)*/ //forMonotonousTimestamps是处理没有乱序数据的方法,不需要指定最大乱序时间
.keyBy(SensorReading::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(6)))//窗口的起始点和偏移量是根据WindowAssigner这个抽象类的assignWindows这个抽象方法来实现的。
.allowedLateness(Time.seconds(30))//延迟30s,在Watermark到达窗口触发的条件时,晚30s关闭窗口,后续到来的数据每来一条数据会汇总到之前计算的结果中。
.sideOutputLateData(outputTag)//侧输出流解决数据丢失的问题,可以聚合之前的统计结果。可以直接在本任务中聚合到最后的输出结果中,也可以输出到别的地方进行批处理聚合。一条数据只有在不属于任何一个当前开着的窗口的时候才会进入到侧输出流中。
.minBy("temperature");//这里只能通过聚合的方式处理。
result.print("result:");
result.getSideOutput(outputTag).print("sideOutput:");
//TODO Watermark的生成位置越靠近数据源越好,对于Kafka可以直接在读取数据的时候生成Watermark,使用这种方式,数据源通常可以更精准地跟踪Watermark,整体Watermark生成将更精确。直接在源上指定WatermarkStrategy意味着你必须使用特定数据源接口.
8.3 State
Flink的计算分为有状态的和无状态的,如基本算子map、flatMap、filter就是无状态算子;而sum、reduce、max等这些都是有状态的算子。可以认为状态就是一个本地变量,可以被任务的业务逻辑访问。Flink会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑。在Flink中,总的来说有两种状态,一个是算子状态,另一个是键控状态。
8.3.1 Operatior State(算子状态)
算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态。状态对于同一子任务(SubTask)而言是共享的。算子状态不能由相同或不同算子的另一个子任务访问。算子状态可以用于所有算子。常用于Source和Sink,通过实现CheckpointedFunction接口实现算子状态。
public class BufferingSink implements SinkFunction<Tuple2<String,Integer>>,CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
// TODO 进行checkpoint时会调用snapshotState()
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();//清理内容数据并存入CheckPoint磁盘目录中
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
//TODO 用户自定义函数初始化时会调用initializeState(),初始化包括第一次自定义函数初始化和从之前的checkpoint恢复。因此initializeState()不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。
//TODO 当前operator state以list的形式存在。这些状态是一个可序列化对象的集合List,彼此独立,方便在改变并发后进行状态的重新分派。换句话说,这些对象是重新分配non-keyed state的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
//TODO Even-split redistribution:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。比如说,算子A的并发读为1,包含两个元素element1和element2,当并发读增加为2时,element1会被分到并发0上,element2则会被分到并发1上。
//TODO Union redistribution:每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。
//TODO 调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如getUnionListState(descriptor)会使用union redistribution算法,而 getListState(descriptor)则简单的使用Even-split redistribution算法。
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
//TODO 当初始化好状态对象后,我们通过isRestored()方法判断是否从之前的故障中恢复回来,如果该方法返回true则表示从故障中进行恢复,会执行接下来的恢复逻辑。
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements CheckpointedFunction {
/** current offset for exactly once semantics */
private Long offset = 0L;
/** flag for job cancellation */
private volatile boolean isRunning = true;
/** 存储 state 的变量. */
private ListState<Long> state;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
"state",
LongSerializer.INSTANCE));
// 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
for (Long l : state.get()) {
offset = l;
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.add(offset);
}
}
8.3.2 Keyed State(键控状态)
键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。只能用在KeyedStream上的算子中。每个Key对应一个State,一个Operator实例处理多个Key,访问相应的多个State,并发改变,State随着Key在实例间迁移。通过RuntimeContext访问,支持的数据结构有ValueState、ListState、ReducingState、AggregatingState、MapState等状态。因为是通过RuntimeContext来访问的,运行时上下文是存在于抽象类AbstractRichFunction中的,所以必须继承AbstractRichFunction的子类来实现具体的业务。
8.3.2.1 Value state(值状态)
//TODO 保存一个可以更新和检索的值(如下所述,每个值都对应到当前的输入数据的key,因此算子接收到的每个key 都可能对应一个值)。 这个值可以通过 update(T)进行更新,通过T value()进行检索。
private class CustomFunction extends KeyedProcessFunction<String,String,String>{
private ValueState<BigDecimal> sum;
@Override
public void open(Configuration parameters) throws Exception {
sum = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", BigDecimal.class));//声明一个键控状态
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (sum.value()!=null){ //获取状态,获取状态后首先需要判断状态是否为空
sum.update();//更新状态
}
sum.clear();//清空状态
}
}
8.3.2.2 List state(列表状态)
//TODO 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)或者addAll(List<T>)进行添加元素,通过Iterable<T> get()获得整个列表。还可以通过update(List<T>)覆盖当前的列表。
private class CustomFunction extends KeyedProcessFunction<String,String,String>{
private ListState<Student> student;
@Override
public void open(Configuration parameters) throws Exception {
student = getRuntimeContext().getListState(new ListStateDescriptor<>("student", Student.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
student.get();//对于ListState状态来说get为获取状态的集合
student.add();//给状态集合添加一条记录
student.addAll();//给状态集合添加一个同类型的集合
student.update();//更新状态集合
student.clear();//清空状态
}
}
8.3.2.3 Map state(映射状态)
//TODO 维护了一个映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)或者putAll(Map<UK,UV>)添加映射。使用get(UK)检索特定key。使用entries(),keys()和values()分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()来判断是否包含任何键值对。
private class CustomFunction extends KeyedProcessFunction<String,String,String>{
private MapState<String,BigDecimal> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapState", String.class,BigDecimal.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
mapState.put("",BigDecimal.ZERO); //给状态增加一条记录
mapState.get("key");//通过key获取value值
Iterable<BigDecimal> values = mapState.values();//返回value的集合
Iterable<String> keys = mapState.keys();//返回key的集合
boolean key = mapState.contains("key");//判断指定的key是否在状态中存在
boolean empty = mapState.isEmpty();//判断状态是否为空
Iterable<Map.Entry<String, BigDecimal>> entries = mapState.entries();//返回可遍历集合
Iterator<Map.Entry<String, BigDecimal>> iterator = mapState.iterator();//返回可遍历集合
mapState.putAll(new HashMap<>());//给状态添加一个同类型的集合
mapState.remove("key");//根据指定的key删除数据
mapState.clear();//清空状态
}
}
8.3.2.4 Reducing state(聚合状态)
//TODO 这种方式在实际开发中基本不适用,和keyBy(x->x.getId()).reduce(new ReduceFunction())方式的效果相同,如果需要使用定时器之类的其他操作可以使用这种方式。
//TODO 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState类似,但使用add(T)增加元素,会使用提供的ReduceFunction进行聚合。
private class CustomFunction extends KeyedProcessFunction<String, String, String> {
private ReducingState<Integer> reducingState;
@Override
public void open(Configuration parameters) throws Exception {
reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("reducingState", (a, b) -> a + b, Integer.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
reducingState.add(0);
reducingState.get();
reducingState.clear();
}
}
8.3.2.5 Aggregating state(聚合状态)
//TODO 这种方式在实际开发中基本不适用,和keyBy(x->x.getId()).aggregate(new AggregateFunction())方式的效果相同,如果需要使用定时器之类的其他操作可以使用这种方式。
//TODO 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState相反的是,聚合类型可能与添加到状态的元素的类型不同。接口与ListState类似,但使用 add(IN)添加的元素会用指定的AggregateFunction进行聚合。
private class CustomFunction extends KeyedProcessFunction<String, String, String> {
private AggregatingState<Integer,Integer> aggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor("aggregatingState", new AggregateFunction<Integer, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Integer value, Integer accumulator) {
return accumulator+value;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a+b;
}
},Integer.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
aggregatingState.add();
aggregatingState.get();
aggregatingState.clear();
}
}
8.3.2.6 状态有效期(TTL)
//TODO 状态上次的修改时间会和数据一起保存在state backend中,因此开启该特性会增加状态数据的存储。Heap state backend会额外存储一个包括用户状态以及时间戳的Java对象,RocksDB state backend会在每个状态值(list 或者 map 的每个元素)序列化后增加8个字节。
//TODO 暂时只支持基于processing time的TTL,不支持事件时间。
//TODO 尝试从checkpoint/savepoint进行恢复时,TTL的状态(是否开启)必须和之前保持一致,否则会遇到“StateMigrationException”。
//TODO TTL的配置并不会保存在checkpoint/savepoint中,仅对当前Job有效。
//TODO 当前开启TTL的map state仅在用户值序列化器支持null的情况下,才支持用户值为null。如果用户值序列化器不支持null,可以用NullableSerializer 包装一层。
//TODO 启用TTL配置后,StateDescriptor中的defaultValue将会失效,用户需要手动管理那些实际值为null或已过期的状态默认值。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1)) //TODO 状态的有效期
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //TODO 状态的更新策略 StateTtlConfig.UpdateType.OnCreateAndWrite :仅在创建和写入时更新,也是默认更新策略;StateTtlConfig.UpdateType.OnReadAndWrite:读取时也更新
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//TODO 数据在过期但还未被清理时的可见性。StateTtlConfig.StateVisibility.NeverReturnExpired:不返回过期数据,这也是默认配置;StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp:会返回过期但未清理的数据,数据在物理删除前都会返回。
.disableCleanupInBackground()//TODO 默认情况下,过期数据会在读取的时候被删除,同时会有后台线程定期清理(如果StateBackend支持的话)。可以通过StateTtlConfig配置关闭后台清理
.cleanupFullSnapshot()//TODO 启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。这种策略在RocksDBStateBackend的增量checkpoint模式下无效。
.cleanupIncrementally(10, true)//TODO 如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。每次触发增量清理时,从迭代器中选择已经过期的数进行清理。该策略有两个参数。第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。Heap backend默认会检查5条状态,并且关闭在每条记录时触发清理。增量清理会增加数据处理的耗时。现在仅Heap state backend支持增量清除机制。在 RocksDB state backend上启用该特性无效。如果没有state访问,也没有处理数据,则不会清理过期数据。对已有的作业,这个清理方式可以在任何时候通过StateTtlConfig启用或禁用该特性,比如从savepoint重启后。
.cleanupInRocksdbCompactFilter(1000)//TODO 如果使用RocksDB state backend,则会启用Flink为RocksDB定制的压缩过滤器。RocksDB会周期性的对数据进行合并压缩从而减少存储空间。Flink提供的RocksDB压缩过滤器会在压缩时过滤掉已经过期的状态数据。这个参数是处理状态的条数。时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用JNI的开销,因此会影响整体的压缩性能。RocksDB backend的默认后台清理策略会每处理1000条数据进行一次。压缩时调用TTL过滤器会降低速度。TTL过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。对于集合型状态类型(比如list和map),会对集合中每个元素进行检查。对于元素序列化后长度不固定的列表状态,TTL过滤器需要在每次JNI调用过程中,额外调用Flink的java序列化器,从而确定下一个未过期数据的位置。对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
8.4 CheckPoint
8.4.1 State Backends(状态后端)
在启动CheckPoint机制时,状态会随着CheckPoint而持久化,以防止数据丢失、保障恢复时的一致性。状态内部的存储格式、状态在CheckPoint时如何持久化以及持久化在哪里均取决于选择的State Backend。
1. 每传入一条数据,有状态的算子任务都会读取和更新状态。
2. 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。
3. 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
4. 状态后端主要负责两件事:本地的状态管理(在TaskManager的JVM中保存),以及将检查点(CheckPoint)状态写入远程存储。
8.4.1.1 MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将他们管理在TaskManager的JVM堆上,而将CheckPoint存储在JobManager的内存中。由于是存储在内存中,所以它的特点是:快速、低延迟、但是不稳定。
MemoryStateBackend能配置异步快照。异步快照默认是开启的。用户可以在实例化MemoryStateBackend的时候,将相应布尔类型的构造参数设置为false来关闭异步快照(仅在debug的时候使用)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, false));
//TODO MemoryStateBackend 的限制:
//TODO 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。
//TODO 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于在JobManager和TaskManager之间发送的消息的最大大小,这个最大大小由参数akka.framesize控制,默认为10M。
//TODO MemoryStateBackend 适用场景:
//TODO 本地开发和调试。
//TODO 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。
8.4.1.2 FsStateBackend
FsStateBackend 需要配置一个文件系统的URL(类型、地址、路径),例如:“hdfs://namenode:40010/flink/checkpoints"或"file:///data/flink/checkpoints”。
FsStateBackend将正在运行中的状态数据保存在TaskManager的内存中。CheckPoint时,将状态快照写入到配置的文件系统目录中。少量的元数据信息存储到JobManager的内存中(高可用模式下,将其写入到CheckPoint的元数据文件中)。
FsStateBackend默认使用异步快照来防止CheckPoint写状态时对数据处理造成阻塞。用户可以在实例化FsStateBackend的时候,将相应布尔类型的构造参数设置为false来关闭异步快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(path, false));
//TODO FsStateBackend 适用场景:
//TODO 状态比较大、窗口比较长、key/value状态比较大的Job。
//TODO 所有高可用的场景。
8.4.1.3 EmbeddedRocksDBStateBackend
<!-- 如果在IDE中指定状态后端为RocksDB或者需要通过编程方式修改它,需要添加依赖。 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.14.6</version>
<scope>provided</scope>
</dependency>
EmbeddedRocksDBStateBackend将正在运行中的状态数据保存在RocksDB数据库中,RocksDB数据库默认将数据存储在TaskManager的数据目录。数据以序列化字节数组的方式存储,这种方式由序列化器决定,因此key之间的比较是以字节序的形式进行而不是使用Java的 hashCode或equals()方法。
EmbeddedRocksDBStateBackend只支持异步快照。RocksDBStateBackend的单个Key最大为2G。
EmbeddedRocksDBStateBackend适用状态非常大、窗口非常长、key/value状态非常大的Job以及所有高可用的场景。
可以保留的状态大小仅受磁盘空间的限制。EmbeddedRocksDBStateBackend允许存储非常大的状态。然而,这也意味着使用EmbeddedRocksDBStateBackend将会使应用程序的最大吞吐量降低。所有的读写都必须序列化、反序列化操作,这个比基于堆内存的state backend的效率要低很多。同时因为存在这些序列化、反序列化操作,重用放入EmbeddedRocksDBStateBackend的对象是安全的。
EmbeddedRocksDBStateBackend是目前唯一支持增量Checkpoint的State Backend。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); //TODO 使用RocksDB并开启增量快照。
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
# 在任务提交的时候指定如下参数也可以使用RocksDB状态后端
-Dstate.backend=rocksdb \
-Dstate.checkpoints.dir="hdfs://namenode:40010/flink/checkpoints" \
-Dstate.backend.incremental=true \
# 一旦启用了增量快照,网页上展示的Checkpointed Data Size只代表增量上传的数据量,而不是一次快照的完整数据量。
8.4.1.4 HashMapStateBackend
//TODO 由于MemoryStateBackend以及FsStateBackend的状态都是存储在TaskManager的JVM堆上的,只是Checkpoint的保存位置不同,所以社区新增了一个HashMapStateBackend状态后端。在HashMapStateBackend内部,数据还是以Java对象的形式存储在堆中。Key/value形式的状态和窗口算子会持有一个hash table,其中存储着状态值、触发器。可以用在所有的高可用场景。因为数据以对象形式存储在堆中,因此重用这些对象数据是不安全的。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
//类似于FsStateBackend,状态存储在HDFS上或者存储在其他的分布式文件系统上。
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());//TODO 类似于MemoryStateBackend,Checkpoint快照数据存储在JobManager上。
// TODO 除了通过代码写死具体的配置以外,还可以通过任务提交的方式指定状态后端.
-Dstate.backend=hashmap \
-Dstate.checkpoint-storage=jobmanager \
//TODO 在任务提交的时候增加上面两个参数的话则认为是使用了类似MemoryStateBackend状态后端。Checkpoint保存在JobManager
-Dstate.backend=hashmap \
-Dstate.checkpoints.dir="hdfs://namenode:40010/flink/checkpoints" \
//TODO 在任务提交的时候增加上面两个参数的话则认为是使用了类似FsStateBackend状态后端。Checkpoint保存在HDFS上。
8.4.2 Checkpoint使用方式
Checkpoint使Flink的状态具有良好的容错性,通过checkpoint机制,Flink可以对作业的状态和计算位置进行恢复。Flink中的每个方法或算子都能够是有状态的。状态化的方法在处理单个元素/事件的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。为了让状态容错,Flink需要为状态添加checkpoint(检查点)。Checkpoint使得Flink能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。
//TODO 默认情况下checkpoint是禁用的。通过这个属性开启checkpoint,里面的两个参数第一个是checkpoint的间隔,单位毫秒。
//TODO 精确一次(exactly-once)对比至少一次(at-least-once): 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
streamExecutionEnvironment.enableCheckpointing(Time.seconds(2).toMilliseconds(),CheckpointingMode.EXACTLY_ONCE);
//TODO Flink为所有检查点和保存点提供可选的压缩(默认情况下关闭的)。压缩算法使用的snappy。压缩在键控状态下的键组粒度上进行,即每个键组可以单独解压缩,这对于重新缩放很重要。压缩选项对增量快照没有影响,
streamExecutionEnvironment..getConfig().setUseSnapshotCompression(true);
CheckpointConfig checkpointConfig = streamExecutionEnvironment.getCheckpointConfig();
//TODO Checkpoint在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时checkpoint就会被删除。可以通过配置来保留checkpoint,
// 这些被保留的checkpoint在作业失败或取消时不会被清除。这样,你就可以使用该checkpoint来恢复失败的作业。
//TODO ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业保留的checkpoint。
//TODO ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被保留。
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//TODO checkpoint超时:如果checkpoint执行的时间超过了该配置的阈值,还在进行中的checkpoint操作就会被抛弃。
checkpointConfig.setCheckpointTimeout(Time.minutes(1).toMilliseconds());
//TODO checkpoints之间的最小时间:该属性定义在checkpoint之间需要多久的时间,以确保流应用在checkpoint之间有足够的进展。如果值设置为了5000,
// 无论checkpoint持续时间与间隔是多久,在前一个checkpoint完成时的至少五秒后会才开始下一个checkpoint。这个值也意味着同时执行的checkpoint的数目是1。
checkpointConfig.setMinPauseBetweenCheckpoints(Time.minutes(1).toMilliseconds());
//TODO 并发checkpoint的数目: 允许多个checkpoint并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的checkpoint去最小化故障后重跑的pipelines来说,是有意义的。
// 该属性不能和上个属性同时使用。
checkpointConfig.setMaxConcurrentCheckpoints(1);
//TODO 在checkpoint出错时使task失败或者继续进行task:他决定了在checkpoint的过程中发生错误时,是否使task也失败,使失败是默认的行为。禁用它时,这个任务将会简单的把checkpoint错误信息报告给checkpoint coordinator并继续运行。
checkpointConfig.setTolerableCheckpointFailureNumber(0);
//TODO 启用未对齐的检查点,只有在CheckpointingMode.EXACTLY_ONCE并且最大并发数为1的时候才能启用未对齐的检查点,未对齐的检查点详细请看8.4.3.3
checkpointConfig.enableUnalignedCheckpoints();
// TODO 对齐检查点超时,激活时,每个检查点仍将作为对齐检查点开始,但如果全局检查点开始与子任务检查点开始之间的时间超过对齐检查点超时,则检查点将作为未对齐检查点进行。只有在启用未对齐的检查点时才使用。
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(30));
# 上面的程序中定义的参数也可以通过如下的任务启动参数指定。
-Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION # 当作业取消时,保留作业的checkpoint。
-Dstate.checkpoints.num-retained=2 # 当作业取消时,保留作业的checkpoint并且设置这个参数的话,那么checkpoint只保存最后一份。
-Dexecution.checkpointing.mode=EXACTLY_ONCE # 检查点模式,默认为EXACTLY_ONCE,也可以设置为AT_LEAST_ONCE。
-Dexecution.checkpointing.tolerable-failed-checkpoints=0 # 可容忍的检查点连续失败数。
-Dexecution.checkpointing.timeout=1min #检查点超时时间
-Dexecution.checkpointing.interval=1min # 检查点之间的间隔时间
-Dexecution.checkpointing.max-concurrent-checkpoints=1 # 并发checkpoint的数目
-Dexecution.checkpointing.min-pause=1min # checkpoints之间的最小时间
-Dexecution.checkpointing.snapshot-compression=true # 开启状态快照的压缩
-Dexecution.checkpointing.unaligned=true # 启用未对齐的检查点,只有在CheckpointingMode.EXACTLY_ONCE并且最大并发数为1的时候才能启用未对齐的检查点,未对齐的检查点详细请看8.4.3.3
-Dexecution.checkpointing.aligned-checkpoint-timeout=30s #对齐检查点超时,激活时,每个检查点仍将作为对齐检查点开始,但如果全局检查点开始与子任务检查点开始之间的时间超过对齐检查点超时,则检查点将作为未对齐检查点进行。只有在启用未对齐的检查点时才使用。
8.4.3 Checkpoint执行原理
Flink容错机制的核心部分是对分布式数据流和算子状态绘制一致的快照。这些快照充当一致的检查点,系统可以在发生故障时回退到这些检查点。Flink的Checkpoint使用Chandy-Lamport algorithm算法的一种变体,称为异步barrier快照。Flink分布式快照的核心元素是Stream barrier。
8.4.3.1 Stream barrier
JobManager上的Checkpoint Coordinator(检查点协调器)向TaskManager发送开始Checkpoint的指令,它会让所有的Source记录它们的偏移量,并将带快照编号的Checkpoint barrier注入到它们的流中。并作为数据流的一部分与记录一起流动。barrier永远不会超过记录,它们严格按照顺序流动。barrier将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录。快照的记录被推送到它前面。barrier不会中断流的流动,因此非常轻量级。来自不同快照的多个barrier可以同时在流中,这意味着各种快照可能同时发生。来自不同流的多个barrier会进行对齐称为对齐检查点,不对齐的话称为未对齐的检查点。
8.4.3.2 对齐检查点
如上图,在每个operator上都有一个输入流和输出流,接收多个输入流的算子需要在Checkpoint barrier上对齐输入流。一旦从输入流接收到Checkpoint barrier,它就无法处理来自该流的任何其他数据,直到它也从其他输入流接收到Checkpoint barrier。一旦所有输入流都接收到Checkpoint barrier,那么operator就会发出所有待处理的数据,然后自己发出Checkpoint barrier到下游算子并解除快照状态处理来自上游operator的记录,在处理来自上游operator记录之前会先处理自己输入流的缓存数据。最后,operator会以写时复制的机制将状态异步写入状态后端。
8.4.3.3 未对齐的检查点
Checkpoint也可以在未对齐的情况下执行。基本思想是,只要传输中的数据成为Operator状态的一部分,Checkpoint就可以超越所有传输中的数据。
如上图。operator对存储在其输入缓冲区中的第一个barrier做出反应。它通过将barrier添加到输出缓冲区的末尾来立即将barrier转发给下游的operator。operator将所有被超越的记录标记为异步存储,并创建其自身状态的快照。因此,operator只是短暂地停止输入处理以标记缓冲区,转发barrier,并创建其他状态的快照。
未对齐的检查点可确保barrier尽快到达汇点。它特别适用于至少有一个缓慢移动的数据路径的应用程序,其中对齐时间可能长达数小时。然而,由于它增加了额外的I/O压力,存储到状态后端的数据量包括了输入缓冲区输出缓冲区的数据。当状态后端的I/O成为瓶颈时,未对齐的检查点就没啥用了。
8.4.4 自动重启策略
在Flink程序运行过程中,可能会遇到一些不确定的异常,我们想让Flink可以忽略一些细微的问题,能够自动从上一次的任务状态中恢复过来接着执行,也就是希望Flink能够根据指定的重启策略自动的从Checkpoint恢复。
如果没有启用checkpoint,就采用"不重启"策略。如果启用了checkpoint且没有配置重启策略,那么就采用固定延时重启策略,此时最大尝试重启次数Integer.MAX_VALUE 参数设置。
8.4.4.1 Fixed Delay Restart Strategy(固定延时重启策略)
固定延时重启策略按照给定的次数尝试重启作业。 如果尝试超过了给定的最大次数,作业将最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。有以下两种方式。
# 通过在flink-conf.yaml中设置如下配置参数,默认启用此策略。
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3 # 尝试重启的次数
restart-strategy.fixed-delay.delay: 10 s # 延时10s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
8.4.4.2 Failure Rate Restart Strategy(故障率重启策略)
故障率重启策略在故障发生之后重启作业,但是当故障率(每个时间间隔发生故障的次数)超过设定的限制时,作业会最终失败。在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。有以下两种方式。
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3 # 每个时间间隔的最大故障次数
restart-strategy.failure-rate.failure-rate-interval: 5 min # 测量故障率的时间间隔
restart-strategy.failure-rate.delay: 10 s # 延时
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
));
8.4.4.3 No Restart Strategy(无重启策略)
作业直接失败,不尝试重启。
restart-strategy: none
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
8.4.4.4 Fallback Restart Strategy(群集定义的重启策略)
如果为Standalone模式的话就使用flink-conf.yaml配置的重启策略,如果使用的Yarn,则就Yarn的任务重启策略。
8.4.5 保存点(Savepoints)
Savepoint是依据Checkpoint机制所创建的流作业执行状态的一致镜像。你可以使用Savepoint进行Flink作业的停止与重启、fork 或者更新。
8.4.5.1 Savepoint和Checkpoint的区别。
Savepoint由用户创建,拥有和删除。他们的用例是计划的,手动备份和恢复。 例如,升级Flink版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。当然,Savepoint 必须在作业停止后继续存在。从概念上讲,Savepoint的生成,恢复成本可能更高一些,Savepoint更多地关注可移植性和对前面提到的作业更改的支持。
Checkpoint的主要目的是为意外失败的作业提供恢复机制。Checkpoint的生命周期由Flink管理,即Flink创建,管理和删除Checkpoint无需用户交互。作为一种恢复和定期触发的方法。Checkpoint不支持扩缩容。Checkpoint使用state backend特定的数据格式,可能以增量方式存储。
在Flink1.13版本统一了Savepoint的二进制格式,这意味着你可以生成Savepoint并且之后使用另一种state backend读取它。
强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String)方法手动指定算子ID 。这些ID将用于恢复每个算子的状态。
DataStream<String> stream = env.
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
.print();
8.4.5.2 Savepoint相关命令
bin/flink savepoint <jobId> <state.savepoints.dirs> # 触发savepoint
bin/flink savepoint <jobId> <state.savepoints.dirs> -yid <Yarn-application-Id> #使用Yarn触发 Savepoint
bin/flink cancel -s <state.savepoints.dirs> <jobId> # 这将触发指定ID的作业的savepoint并把savepoint的结果存储在指定的savepoint目录中。该目录需要JobManager和TaskManager可以访问。
bin/flink savepoint -d <state.savepoints.dirs> #删除savepoint
8.4.6 状态一致性
有状态的流处理,内部每个算子任务都可以有自己的状态。对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。一条数据不应该丢失,也不应该重复计算。在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。
8.4.7 状态一致性分类
8.4.7.1 AT-MOST-ONCE(最多一次)
当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。At-most-once语义的含义是最多处理一次事件。
8.4.7.2 AT-LEAST-ONCE(至少一次)
在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。
8.4.7.3 EXACTLY-ONCE(精确一次)
恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。
8.4.8 端到端精确一次保证
端到端精确一次保证除了Flink内部需要通过Checkpoint来保证以外还需要外部的Source和Sink来保证。
8.4.8.1 Source
Source需要可以重设数据的读取位置,例如Kafka、Mysql binlog等等组件。
8.4.8.2 Sink
1. 幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。但是在故障恢复的时候会出现短暂的类似数据回滚的现象。
2. 事务写入
在Flink中事务写入提供了两种方式,一种是预写日志:把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统;另一种是两阶段提交:对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里,然后将这些数据写入外部sink系统,但不提交它们,这时只是"预提交"。当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入。预写日志在Flink提供了GenericWriteAheadSink模板类来实现这种方式,但是因为没有事务,如果提交之后出现错误会导致数据丢失。而两阶段提交真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口。
8.4.8.3 两阶段提交对外部Sink系统的要求
1. 外部Sink系统必须提供事务支持,或者Sink任务必须能够模拟外部系统上的事务。
2. 在Checkpoint的间隔期间里,必须能够开启一个事务并接受数据写入。
3. 在收到Checkpoint完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候Sink系统关闭事务(例如超时了),那么未提交的数据就会丢失。
4. Sink任务必须能够在进程失败后恢复事务。
5. 提交事务必须是幂等操作。
8.4.8.4 两阶段提交Checkpoint执行流程Sink的相关操作
1. Sink任务首先把数据写入外部kafka,这些数据都属于预提交的事务;遇到barrier时,把状态保存到状态后端,并开启新的预提交事务。
2. 当所有算子任务的快照完成,也就是这次的Checkpoint完成时,JobManager会向所有任务发通知,确认这次Checkpoint完成。
3. Sink任务收到确认通知,正式提交之前的事务,kafka中未确认数据改为“已确认”。
9 CEP(Complex Event Processing)
FlinkCEP是在Flink上层实现的复杂事件处理库。它可以让你在无限事件流中检测出特定的事件模型。
9.1 模式API
要使用CEP需要在pom.xml文件中引入如下依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.12.7</version>
</dependency>
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
9.1.1 单个模式
一个模式可以是一个单例或者循环模式。单例模式只接受一个事件,循环模式可以接受多个事件。默认情况下,模式都是单例的,你可以通过使用量词把它们转换成循环模式。 每个模式可以有一个或者多个条件来决定它接受哪些事件。每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。模式的名字不能包含字符":"。
9.1.1.1 模式操作where(condition)
为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句和SQL语句的AND关键词作用相同。
pattern.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // 一些判断条件
}
});
9.1.1.2 模式操作or(condition)
增加一个新的判断,和当前的判断取或。一个事件只要满足至少一个判断条件就匹配到模式:
pattern.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // 一些判断条件
}
}).or(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // 替代条件
}
});
9.1.1.3 模式操作until(condition)
为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。只适用于和oneOrMore()同时使用。在基于事件的条件中,它可用于清理对应模式的状态。
pattern.oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // 替代条件
}
});
9.1.1.4 模式操作subtype(subClass)
为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式:
pattern.subtype(SubEvent.class);
9.1.1.5 模式操作oneOrMore()
指定模式期望匹配到的事件至少出现一次。推荐使用until()或者within()来清理状态。
pattern.oneOrMore();
9.1.1.6 模式操作timesOrMore(#times)
指定模式期望匹配到的事件至少出现#times次。
pattern.timesOrMore(2);
9.1.1.7 模式操作times(#ofTimes)
指定模式期望匹配到的事件正好出现的次数。
pattern.times(2);
9.1.1.8 模式操作times(#fromTimes, #toTimes)
指定模式期望匹配到的事件出现次数在#fromTimes和#toTimes之间。
pattern.times(2, 4);
9.1.1.9 模式操作optional()
指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。
pattern.oneOrMore().optional();
9.1.1.10 模式操作greedy()
指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。
pattern.oneOrMore().greedy();
9.1.2 组合模式
Pattern<Event, ?> start = Pattern.<Event>begin("start");
FlinkCEP支持事件之间使用如下的连续策略。模式序列不能以notFollowedBy()结尾。一个NOT模式前面不能是可选的模式。
9.1.2.1 严格连续next()
期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
Pattern<Event, ?> strict = start.next("middle").where(...);
9.1.2.2 松散连续followedBy()
忽略匹配的事件之间的不匹配的事件。松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上。松散连续会”跳过不匹配的事件直到匹配上的事件”。
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
9.1.2.3 不确定的松散连续followedByAny()
更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。例如模式“a b”,事件序列“a c b1 b2”,则输出的匹配事件为 {a b1}, {a b2}。
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
9.1.2.4 严格连续的NOT模式notNext()
如果不想后面直接连着一个特定事件。
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
9.1.2.5 松散连续的NOT模式notFollowedBy()
如果不想一个特定事件发生在两个事件之间的任何地方。
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
可以为模式定义一个有效时间约束。例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间。一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。
next.within(Time.seconds(10));
9.1.3 模式组
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// 严格连续
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// 松散连续
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// 不确定松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
9.2 从模式中选取
在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。推荐使用PatternProcessFunction。PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时,对一个模式会有不止一个事件被接受。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
IN startEvent = match.get("start").get(0);
IN endEvent = match.get("end").get(0);
out.collect(OUT(startEvent, endEvent));
}
}
9.3 处理超时的部分匹配
当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。processTimedOutMatch不能访问主输出。但你可以通过Context对象把结果输出到侧输出。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
...
}
@Override
public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
IN startEvent = match.get("start").get(0);
ctx.output(outputTag, T(startEvent));
}
}
9.4 CEP库中的时间
在CEP中,事件的处理顺序很重要。在使用事件时间时,为了保证事件按照正确的顺序被处理,一个事件到来后会先被放到一个缓冲区中, 在缓冲区里事件都按照时间戳从小到大排序,当水位线到达后,缓冲区中所有小于水位线的事件被处理。这意味着水位线之间的数据都按照时间戳被顺序处理。这也意味着水位线设置的最大延迟时间能涵盖所有的乱序数据。如果有晚到的数据的话,晚到的事件不会被处理。你也可以指定一个侧输出标志来收集比最新水位线晚到的事件。
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream
.sideOutputLateData(lateDataOutputTag)
.select(
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
10 双流Join
10.1 union
//TODO 直接将多条流合在一起,叫作流的“联合”(union)联合操作要求必须流中的数据类型必须相同,对于合流之后的水位线,也是要以最小的那个为准,多流合并时处理的时效性是以最慢的那个流为准的。在实际场景中,可能相同的数据来自不同的数据源,就可以使用这种方式。
stream1.union(stream2, stream3, ...)
10.2 connect
直接把两条流像接线一样对接起来,连接操作允许流的数据类型不同。
10.2.1 和普通流连接
//TODO 连接之后被转换为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
//对于ConnectedStreams,我们需要重写CoMapFunction、CoFlatMapFunction或CoProcessFunction。CoMapFunction和CoFlatMapFunction这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据。
//Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用。在这里我选择CoProcessFunction作为示例。我认为在工作中这种场景对于connect可能使用较多。
// 检测同一支付单在两条流中是否匹配,不匹配就报警
appStream.connect(thirdpartStream)
.keyBy(data -> data.f0, data -> data.f0)
.process(new CoProcessFunction<Tuple3<String, String, Long>,Tuple4<String, String, String, Long>, String>{
// 定义状态变量,用来保存已经到达的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String,String,String,Long>> thirdPartyEventState;
@Override
public void open(Configuration parameters) throws Exception {
//初始化两个状态
appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String,String, Long>>(
"app-event",
Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String,Long>>(
"thirdparty-event",
Types.TUPLE(Types.STRING,Types.STRING,Types.STRING,Types.LONG)));
}
@Override
public void processElement1(Tuple3<String, String, Long> value,
Context ctx, Collector<String> out) throws Exception {
// 看另一条流中事件是否来过
if(thirdPartyEventState.value() != null){
out.collect("对账成功:" + value + " " +thirdPartyEventState.value());
thirdPartyEventState.clear(); // 清空状态
} else {
appEventState.update(value); // 更新状态
// 注册一个5秒后的定时器,开始等待另一条流的事件
ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value,
Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null){
out.collect("对账成功:" + appEventState.value() + " " + value);
appEventState.clear(); // 清空状态
} else {
thirdPartyEventState.update(value); // 更新状态
// 注册一个5秒后的定时器,开始等待另一条流的事件
ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
if (appEventState.value() != null) {
out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
}
if (thirdPartyEventState.value() != null) {
out.collect("对账失败:"+thirdPartyEventState.value()+" "+"app信息未到");
}
appEventState.clear();
thirdPartyEventState.clear();
}
}).print();
10.2.2 和广播流连接
//TODO 如果连接的是一个广播流,这时合并两条流得到的就变成了一个“广播连接流”。这种连接方式往往用在维表匹配的场景。因为维表不是不变的,所以我们可以用一个单独的流来获取维表数据;而这些数据是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)。可以使用Flink-cdc技术实时的监控维表的存储库来获取维表数据。示例:
//将ruleStream广播出去,这里需要传入一个MapState的描述器参数
MapStateDescriptor<String,Rule> ruleStateDescriptor=new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream=ruleStream.broadcast(ruleStateDescriptor);
stream.connect(ruleBroadcastStream).process(new BroadcastProcessFunction<>() {...} );
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
...
public abstract void processElement(IN1 value, ReadOnlyContext ctx,
Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx,
Collector<OUT> out) throws Exception;
...
}
10.3 coGroup
10.4 Window Join
10.4.1 Tumbling Window Join
当执行滚动窗口连接时,具有公共键和公共滚动窗口的所有元素将作为成对组合连接并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以在一个流的滚动窗口中没有来自另一个流的元素的元素不会被释放。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
10.4.2 Sliding Window Join
当执行滑动窗口连接时,具有公共键和公共滑动窗口的所有元素将作为成对组合连接并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发出。注意,有些元素可能在一个滑动窗口中被连接,但在另一个滑动窗口中却没有。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
10.4.3 Session Window Join
当执行会话窗口连接时,具有相同键的所有元素在“组合”时满足会话条件,将以成对组合的方式连接并传递给JoinFunction或FlatJoinFunction。同样,这将执行一个内部连接,因此如果有一个会话窗口只包含来自一个流的元素,则不会发出输出。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
10.5 Interval Join
Interval Join使用一个公共键(key)连接两个流,Interval Join目前只支持事件时间,Interval Join目前只支持内部连接,也就是说两个流能Join上的流会返回一个笛卡尔积。返回的数据会传递给ProcessJoinFunction。
//在上面的例子中,我们将两个流“橙色”和“绿色”连接在一起,下界为-2毫秒,上界为+1毫秒。默认情况下,这些边界是包含的,但是. lowerboundexclusive()和upperboundexclusive可以被应用来改变行为。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
11 Table API & SQL
12 资源配置调优
12.1 配置TaskManager内存
在开发过程中可以通过Web UI的TaskManager上面的内存使用情况来使用如下参数调整各个内存的使用情况。如果状态后端使用的不是RocksDB,可以把taskmanager.memory.managed.size设置为0。
12.1.1 进程总内存(Total Process Memory)
taskmanager.memory.process.size 默认1728m,也就是flink-conf.yaml文件中的内存初始大小。
12.1.2 Flink总内存(Total Flink Memory)
可以根据进程总内存的默认值以及JVM Metaspace和JVM开销的默认值算出来Flink总内存的默认值为1280M。也可以通过taskmanager.memory.flink.size参数设置Flink总内存。
12.1.3 框架堆内存(Framework Heap Memory)
taskmanager.memory.framework.heap.size 默认128MB。
12.1.4 任务堆内存(Task Heap Memory)
taskmanager.memory.task.heap.size 默认none,由Flink内存扣除掉其他部分的内存得到。
12.1.5 托管内存(Managed Memory)
流处理作业中用于RocksDB State Backend以及流处理和批处理作业中用于排序、哈希表及缓存中间结果。
taskmanager.memory.managed.fraction 默认0.4。
taskmanager.memory.managed.size 默认none。
如果size没指定,则等于Flink内存*fraction。
12.1.6 框架堆外内存(Framework Off-heap Memory)
taskmanager.memory.framework.off-heap.size 默认128MB。
12.1.7 任务堆外内存(Task Off-heap Memory)
taskmanager.memory.task.off-heap.size 默认0,表示不使用堆外内存。
12.1.8 网络内存(Network Memory)
taskmanager.memory.network.fraction 默认0.1。
taskmanager.memory.network.min 默认64mb。
taskmanager.memory.network.max 默认1gb。
Flink内存*fraction,如果小于配置的min(或大于配置的max)大小,则使用min/max大小。
12.1.9 JVM Metaspace
taskmanager.memory.jvm-metaspace.size 默认256M。
12.1.10 JVM开销
JVM执行时自身所需要的内存,包括线程堆栈、IO、编译缓存等所使用的内存。
taskmanager.memory.jvm-overhead.fraction 默认0.1。
taskmanager.memory.jvm-overhead.min 默认192MB。
taskmanager.memory.jvm-overhead.max 默认1G。
总进程内存*fraction,如果小于配置的min(或大于配置的max)大小,则使用min/max大小。
12.2 并行度设置
数据源端是Kafka,Source的并行度设置为Kafka对应Topic的分区数。如果已经等于Kafka的分区数,消费速度仍跟不上数据生产速度,考虑下Kafka要扩大分区,同时调大并行度等于分区数。Flink的一个并行度可以处理一至多个分区的数据,如果并行度多于Kafka的分区数,那么就会造成有的并行度空闲,浪费资源。
keyBy之前的算子一般都是简单转换算子,和Source保持一致,KeyBy之后的算子,如果业务逻辑复杂,并且数据量很大,建议设置并行度为2的整数次幂,如果没有KeyBy之类的重分区算子,并行度也无需设置为2的整数次幂。
Sink端可以根据发送到Sink端的数据量以及下游服务的抗压能力来调整并行度。
可以通过监控指标numRecordsOutPerSecond来统计每秒钟系统处理的数据量的具体情况。
13 RocksDB调优
RocksDB是基于LSM Tree实现的(类似HBase),写数据都是先缓存到内存中,所以RocksDB的写请求效率比较高。RocksDB使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中blockcache中查找,如果内存中没有再去磁盘中查询。使用RocksDB时,状态大小仅受可用磁盘空间量的限制,性能瓶颈主要在于RocksDB对磁盘的读请求,每次读写操作都必须对数据进行反序列化或者序列化。
13.1 开启State访问性能监控
开启State访问性能监控主要对自己创建的状态的读取和更新的延迟进行统计,单位为纳秒(ns), 1 m s = 1 0 − 6 n s 1ms = 10^-6ns 1ms=10−6ns。
如上图,0表示subTask的编号,Map表示当前的算子为map,vi表示在作业中定义的定义的状态名,valueState表示定义的状态为valueState。
# 在启动参数中增加如下的参数就会开启State访问性能监控。性能监控会产生一定的性能已经,RocksDB性能损失大概在1%,hashmap性能损失能达到10%。测试环境可以测试,生产环境不开启。
-Dstate.backend.latency-track.keyed-state-enabled=true
上图是监控的最大、平均、以及最小延迟,按最大延迟44000计算的话,读取延迟是0.044ms。
13.2 开启本地恢复
目前,任务本地恢复仅涵盖键控状态后端。键控状态通常是状态的最大部分。RocksDB因为状态本来就存储在本地目录中,所以天然支持本地恢复,HashMapStateBackend因为状态是存储在JVM堆上的,如果要支持本地恢复的话,需要把状态再向本地文件中写一份作为副本文件。可以在任务启动参数中增加如下参数:
# 开启本地恢复 注意:未对齐的检查点目前不支持任务本地恢复。
-Dstate.backend.local-recovery=true \
# RocksDB因为状态是存储在本地的磁盘目录中,可以配置多块磁盘目录,这样会增大RocksDB的读写IO,也可以把磁盘配置为SSD,可以更大的增加磁盘的访问速度。
-Dstate.backend.rocksdb.localdir=/data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
13.3 调整预定义选项
Flink针对不同的设置为RocksDB提供了一些预定义的选项集合,其中包含了后续提到的一些参数,如果调整预定义选项后还达不到预期的效果,再去调整后续的一些参数。通过如下方式设置。
EmbeddedRocksDBStateBackend rocksdb = new EmbeddedRocksDBStateBackend();
rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
//PredefinedOptions类是一个枚举类,提供了如下配置DEFAULT、SPINNING_DISK_OPTIMIZED、SPINNING_DISK_OPTIMIZED_HIGH_MEM、FLASH_SSD_OPTIMIZED
# 也可以通过提交的任务参数指定
# 设置为机械硬盘+内存模式
-Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM
13.4 开启分区索引功能
通过分区,SST文件的索引/筛选块被划分为更小的块,并在这些块上附加一个顶级索引。当读取索引/筛选器时,只有顶级索引被加载到内存中。然后,分区索引/过滤器使用顶级索引按需将执行索引/过滤器查询所需的分区加载到块缓存中。
# 通过如下启动参数开启分区索引功能
-Dstate.backend.rocksdb.memory.partitioned-index-filters=true
14 反压处理
14.1 反压的理解
Flink每个节点(Task)间的数据都以阻塞队列的方式传输,下游来不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞。反压(BackPressure)通常产生于这样的场景:短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增。
14.2 反压的危害
反压如果不能得到正确的处理,可能会影响到checkpoint时长和state大小,甚至可能会导致资源耗尽甚至系统崩溃。
1. 影响checkpoint时长:barrier不会越过普通数据,数据处理被阻塞也会导致checkpoint barrier流经整个数据管道的时长变长,导致checkpoint总体时间变长。
2. 影响state大小:barrier对齐时,接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达,这些被缓存的数据会被放到state里面,导致checkpoint变大。
这两个影响对于生产环境的作业来说是十分危险的,因为checkpoint是保证数据一致性的关键,checkpoint时间变长有可能导致checkpoint超时失败,而state大小同样可能拖慢checkpoint甚至导致OOM(使用Heap-based StateBackend)或者物理内存使用超出容器资源(使用RocksDBStateBackend)的稳定性问题。
因此,我们在生产中要尽量避免出现反压的情况。
14.3 定位反压节点
解决反压首先要做的是定位到造成反压的节点,排查的时候,先把operator chain禁用,方便定位到具体算子。
# 在不改动代码的前提下可以增加启动参数禁用operator chain
-Dpipeline.operator-chaining=false
14.3.1 利用Web UI定位
在Flink1.13版本开始,优化了反压检测的逻辑,重新实现了作业图的UI展示,Flink现在通过颜色和数值来展示繁忙和反压的程度。反压通过UI展示图体现的两种方式:
1. 该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的Operator(比如flatmap)。这种情况,该节点是反压的根源节点,后续的算子都会反压。
2. 下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况,需要继续排查下游节点,一直找到第一个反压(BackPressure)状态为OK的一般就是根源节点。
如果上述的方法还无法确定,还可以结合Metrics进一步判断。
14.3.2 利用Metrics定位
监控反压时会用到的Metrics主要和Channel接受端的Buffer使用率有关,最为重要的监控指标为outPoolUsage(发送端Buffer的使用率)和inPoolUsage(接收端Buffer的使用率)。反压具体情况可以查询下面表格:
SubTask | outPoolUsage 低 | outPoolUsage 高 |
---|---|---|
inPoolUsage 低 | 正常 | 被下游反压,处于临时情况(还没传递到上游)。 可能是反压的根源,一条输入多条输出的场景。 |
inPoolUsage 高 | 如果上游所有outPoolUsage都是低,有可能最终可能导致反压(还没传递到上游)。 如果上游的outPoolUsage是高,则为反压根源。 | 被下游反压 |
14.4 反压的解决方案
14.4.1 查看是否数据倾斜
在实践中,很多情况下的反压是由于数据倾斜造成的,这点我们可以通过Web UI各个SubTask的Records Sent和Record Received来确认。如下图
14.4.2 使用火焰图分析
# 在任务启动命令上添加如下参数
-Drest.flamegraph.enabled=true
火焰图是通过对堆栈跟踪进行多次采样来构建的。每个方法调用都由一个条形表示,其中条形的长度与其在样本中出现的次数成正比。On-CPU: 处于[RUNNABLE,NEW]状态的线程;Off-CPU:处于[TIMED_WAITING,WAITING,BLOCKED]的线程,用于查看在样本中发现的阻塞调用。纵向是调用链,从下往上,顶部就是正在执行的函数。横向是样本出现次数,可以理解为执行时长。看顶层的哪个函数占据的宽度最大。只要有"平顶",就表示该函数可能存在性能问题。
14.4.3 分析GC情况
TaskManager的内存以及GC问题也可能会导致反压,包括TaskManager JVM各区内存不合理导致的频繁Full GC甚至失联。通常建议使用默认的G1垃圾回收器。
# 在提交脚本中增加如下参数
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
# 如果是Yarn模式,运行的节点一个一个找比较麻烦。可以打开WebUI,选择JobManager或者 TaskManager,点击Stdout,即可看到GC日志,重点查看是否包含有Full GC以及Full GC出现的频率和一次Full GC执行的时间等信息。
15 数据倾斜
数据倾斜可以通过上面介绍反压的解决方案来判断是否存在数据倾斜。数据倾斜总体上分为三种类型:keyBy后的聚合操作存在数据倾斜、keyBy之前发生数据倾斜、keyBy后的窗口聚合操作存在数据倾斜。下面分别对这三种数据倾斜给出解决方案。
15.1 keyBy后的聚合操作存在数据倾斜
在keyBy上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送到下游,使下游接收到的数据量大大减少,从而使得keyBy之后的聚合操作不再是任务的瓶颈。类似MapReduce中Combiner的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从Flink LocalKeyBy实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。所以输入输出方式应该是多次输入一次输出,在这里可以使用FlatMap并且实现CheckpointedFunction接口来自定义状态。
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class LocalKeyByFlatMapFunc extends RichFlatMapFunction<Tuple2<String,
Long>, Tuple2<String, Long>> implements CheckpointedFunction {
//Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
private ListState<Tuple2<String, Long>> listState;
//本地 buffer,存放 local 端缓存的 mid 的 count 信息
private HashMap<String, Long> localBuffer;
//缓存的数据量大小,即:缓存多少数据再向下游发送
private int batchSize;
//计数器,获取当前批次接收的数据量
private AtomicInteger currentSize;
//构造器,批次大小传参
public LocalKeyByFlatMapFunc(int batchSize) {
this.batchSize = batchSize;
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, Long>>
out) throws Exception {
// 1、将新来的数据添加到 buffer 中
Long count = localBuffer.getOrDefault(value, 0L);
localBuffer.put(value.f0, count + 1);
// 2、如果到达设定的批次,则将 buffer 中的数据发送到下游
if (currentSize.incrementAndGet() >= batchSize) {
// 2.1 遍历 Buffer 中数据,发送到下游
for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
out.collect(Tuple2.of(midAndCount.getKey(),
midAndCount.getValue()));
}
// 2.2 Buffer 清空,计数器清零
localBuffer.clear();
currentSize.set(0);
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
listState.clear();
for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
listState.add(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 从状态中恢复 buffer 中的数据
listState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<Tuple2<String, Long>>(
"localBufferState",
Types.TUPLE(Types.STRING, Types.LONG)
)
);
localBuffer = new HashMap();
if (context.isRestored()) {
// 从状态中恢复数据到 buffer 中
for (Tuple2<String, Long> midAndCount : listState.get()) {
// 如果出现 pv != 0,说明改变了并行度,ListState 中的数据会被均匀分发到
新的 subtask 中
// 单个 subtask 恢复的状态中可能包含多个相同的 mid 的 count 数据
// 所以每次先取一下 buffer 的值,累加再 put
long count = localBuffer.getOrDefault(midAndCount.f0, 0L);
localBuffer.put(midAndCount.f0, count + midAndCount.f1);
}
// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
currentSize = new AtomicInteger(batchSize);
} else {
currentSize = new AtomicInteger(0);
}
}
}
15.2 keyBy之前发生数据倾斜
如果keyBy之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因Kafka的Topic中某些Partition的数据量较大,某些Partition的数据量较少。对于不存在keyBy的Flink任务也会出现该情况。
这种情况,需要让Flink任务强制进行shuffle。使用shuffle、rebalance或 rescale。算子即可将数据均匀分配,从而解决数据倾斜的问题。
15.3 keyBy后的窗口聚合操作存在数据倾斜
因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合并且要获取WindowEnd作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起。
第二阶段聚合:按照原来的key及windowEnd作keyby聚合。
16 Job优化
16.1 链路延迟测量
对于实时的流式处理系统来说,我们需要关注数据输入、计算和输出的及时性,所以处理延迟是一个比较重要的监控指标,特别是在数据量大或者软硬件条件不佳的环境下。Flink
提供了开箱即用的LatencyMarker机制来测量链路延迟。开启如下参数:
# 定义从源发出延迟跟踪标记的间隔,单位为ms,启用此功能会显著影响群集的性能。
-Dmetrics.latency.interval=30000
# 监控的粒度
# single:每个算子单独统计延迟。
# operator(默认值):每个下游算子都统计自己与Source算子之间的延迟。
# subtask:每个下游算子的SubTask都统计自己与Source算子的SubTask之间的延迟。
-Dmetrics.latency.granularity=operator
# 一般情况下采用默认的operator粒度即可,这样在Sink端观察到的latency metric就是我们最想要的全链路(端到端)延迟。subtask粒度太细,会增大所有并行度的负担,不建议使用。
# 延迟的单位为ms
# flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency
# 这个监控通过Web UI查看不了,可以通过Prometheus 和 Grafana来进行监控。
16.2 开启对象重用
开启对象重用就是把中间深拷贝的步骤都省略掉,但是不是任何时候都可以开启对象重用的,只有在这一个对象只会被下游的一个Function处理或者下游的所有Function均不会改变对象内部的值,才可以开启对象重用,不然会导致线程安全的问题。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
# 除了可以在代码中指定开启对象重用外,还可以通过启动参数的形式启用。
-Dpipeline.object-reuse=true
16.3 细粒度滑动窗口优化
在滑动窗口中,我们知道一条数据到底属于几个窗口,取决于窗口大小和滑动步长的商,如果窗口长度远远大于滑动步长的话,这样一条窗口可能属于非常多的窗口,性能这时候就会急剧下降。窗口长度远远大于滑动步长的话对状态的存储有非常大的压力。
在具体业务中,我们一般使用滚动窗口+在线存储+读时聚合的思路作为解决方案:
1. 以滑动步长大小为滚动窗口的窗口长度。
2. 每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储。
3. 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。
17 Flink SQL调优
18 网络缓冲调优
18.1 概述
Flink中每条消息都会被放到网络缓冲(network buffer)中,并以此为最小单位发送到下一个Subtask。为了维持连续的高吞吐,Flink在传输过程的输入端和输出端使用了网络缓冲队列。
每个Subtask都有一个输入队列来接收数据和一个输出队列来发送数据到下一个Subtask。拥有更多的中间缓存数据可以使Flink提供更高、更富有弹性的吞吐量,但是也会增加快照时间。
18.2 缓冲消胀机制
Flink1.14新引入的缓冲消胀机制尝试通过自动调整缓冲数据量到一个合理值。缓冲消胀功能计算Subtask可能达到的最大吞吐(始终保持繁忙状态时)并且通过调整缓冲数据量来使得数据的消费时间达到配置值。
-Dtaskmanager.network.memory.buffer-debloat.enabled=true # 开启缓冲消胀机制
-Dtaskmanager.network.memory.buffer-debloat.target=1s # 指定下游算子消费缓冲数据所消耗的时间,默认为1s 。这个值会与测量的吞吐量结合使用。这个功能使用过去的吞吐数据来预测消费剩余缓冲数据的时间。如果预测不准,缓冲消胀机制会导致以下问题:没有足够的缓存数据来提供全量吞吐;有太多缓冲数据对checkpoint barrier推进或者非对齐的checkpoint的大小造成不良影响。
# 如果您的作业负载经常变化(即,突如其来的数据尖峰,定期的窗口聚合触发或者join)可能需要调整一下参数,
-Dtaskmanager.network.memory.buffer-debloat.period=500ms # 这是缓冲区大小重算的最小时间周期。周期越小,缓冲消胀机制的反应时间就越快,但是必要的计算会消耗更多的CPU。默认值为500ms
-Dtaskmanager.network.memory.buffer-debloat.samples=20 # 调整用于计算平均吞吐量的采样数。样本数越少,缓冲消胀机制的反应时间就越快,但是当吞吐量突然飙升或者下降时,缓冲消胀机制计算的最佳缓冲数据量会更容易出错。
-Dtaskmanager.network.memory.buffer-debloat.threshold-percentages=50 # 防止缓冲区大小频繁改变的优化(比如,新的大小跟旧的大小相差不大)。默认值为50,百度分大于50的话会调整下游算子消费缓冲数据所消耗的时间
# 当开启缓冲消胀机制后可以通过Metrics查看这如下两个指标的大小:
estimatedTimeToConsumeBuffersMs # 消费所有输入通道(input channel)中数据的总时间
debloatedBufferSize # 当前的缓冲区大小。
19 常见报错
19.1 非法配置异常
如果您看到从TaskExecutorProcessUtils或JobManagerProcessUtils抛出的IllegalConfigurationException,通常表明存在无效的配置值(例如负内存大小、大于1的
分数等)或配置冲突。请重新配置内存参数。
19.2 Java堆空间异常
如果报OutOfMemoryError:Java heap space异常,通常表示JVM Heap太小。可以尝试通过增加总内存来增加JVM堆大小。也可以直接为TaskManager增加任务堆内存或为JobManager增加JVM堆内存。具体参数查看资源配置调优。
19.3 直接缓冲存储器异常
如果报OutOfMemoryError:Direct buffer memory异常,通常表示JVM直接内存限制太小或存在直接内存泄漏。检查用户代码或其他外部依赖项是否使用了JVM直接内存,以及它是否被正确考虑。可以尝试通过调整直接堆外内存来增加其限制。具体参数查看资源配置调优。
19.4 元空间异常
如果报OutOfMemoryError:Metaspace异常,通常表示JVM元空间限制配置得太小。您可以尝试加大JVM元空间。具体参数查看资源配置调优。
19.5 网络缓冲区数量不足
如果报IOException:Insufficient number of network buffers异常,这仅与TaskManager相关。通常表示配置的网络内存大小不够大。您可以尝试增加网络内存。具体参数查看资源配置调优。
19.6 Kafka动态发现分区
开启Partition的动态发现,该参数表示间隔多久检测一次是否有新创建的partition。默认值是Long的最小值,表示不开启,大于0表示开启。开启时会启动一个线程根据传入的interval定期获取Kafka最新的元数据,新partition对应的那一个Subtask会自动发现并从earliest位置开始消费,新创建的partition对其他subtask并不会产生影响。
KafkaSource<String> build = KafkaSource.<String>builder()
.setBootstrapServers("")
.setTopics("")
.setGroupId("")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "")
.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,Time.seconds(30).toMilliseconds()+"")
.build();
19.7 超出文件描述符限制
java.io.IOException: Too many open files
首先检查Linux系统ulimit -n的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。
19.8 脏数据导致数据转发失败
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如POJO内有空字段,或者抽取事件时间的时间戳为null等。
19.9 通讯超时
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://…]] after [10000 ms]
Akka超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout参数的值(默认只有10秒);另外,调用外部服务时尽量异步操作。