当前位置: 首页 > article >正文

Flink集群批作业实践:七析BI批作业执行

目录

背景

Flink架构介绍

JobManager

TaskManager

Flink集群模式的选择

Flink集群资源提供者的选择

Flink作业的提交

Flink作业项目开发

user jar准备

作业提交


背景

市场上比较常见的大数据批处理分布式计算引擎有Spark、MapReduce和Hive等,而把Flink当作批作业的执行引擎相对来说没那么主流。

不过,因为七析BI本身已经使用了FlinkCDC作为CDC的工具,从统一技术栈(Flink是一个流批一体化引擎)、减少学习成本和维护成本的角度考虑,我们团队选择了Flink来执行我们的批作业。

七析BI简介

七析BI是一款嵌入式自助 BI 平台,无需代码,通过简单的拖拉拽即可实现数据可视化大屏、丰富多样的报表。其中,“数据探索”功能是一个让用户在数据分析前对源数据进行过滤、分组、关联等操作的准备过程。

本文所提及的对Flink的使用,就是通过Flink的批处理能力将源表根据数据探索的逻辑来生成一个新的表。

七析BI数据探索功能示例图

Flink架构介绍

分布式计算引擎架构常见的核心组件通常有资源管理层、调度层和执行层等,Flink也不例外。

Flink集群的进程分为两大类: JobManager 和 TaskManager 。

Flink架构图(来自Flink官方)

JobManager

JobManager负责Flink集群的作业管理、资源管理和作业调度。

  • 作业管理

每一个Flink作业都会根据userJar和运行时的数据生成一个JobGraph,Flink会为每一个作业生成一个JobMaster来管理这个job的整个生命周期。

  • 资源管理

一个job可以分为多个task,负责执行这些task的是TaskManager中的slot。JobManager负责管理所有TaskManger上的slot的状态,使其可以方便地为每个task分配合适的资源。

  • 调度

Flik提供了一系列REST API来接收客户端的请求,用于管理作业、监控集群基本状态等。

在JobManager高可用模式下向Flink集群提交作业时,接收到REST请求的JobManger节点会将任务发送给Master节点执行。

TaskManager

TaskManger是Flink作业的实际执行单位。一个作业对应的JobGraph可分为多个task,而一个task又可包含一个或多个算子。故此也可认为算子链组成了task。

每一个TaskManger从逻辑上又分为多个slot,每一个slot可以同时执行一个task,但一个slot可以同步执行多个算子(算子链)。

每一个slot都会分配一个线程执行,所以slot是Flink作业的最小执行单位,而task slot的总数量代表整个Flink集群的并行度。

算子链-flink的一个优化,将多个算子放在同一个slot中执行可以减少数据在多个slot中传输导致的花销

TaskManager中slot、task、算子链的关系(来自Flink官方)

Flink群模式的选择

Flink的作业分为流作业和批作业两大类。其次,Flink的使用者们所处理的数据量和对数据的可用性要求都会存在差异。以上情况就会使得开发者们对Flink集群的需求有较大的差异。

面对这种差异,Flink提供了多种集群模式:

  • Flink Application集群

集群的生命周期与作业的生命周期一致。作业独享整个集群的资源。集群启动时需要指定一个 userJar,因为作业的入口从 userJar 的 main 方法开始,所以每个集群只能执行一个作业。

  • Flink Job集群

集群的生命周期与作业的生命周期一致。作业独享整个集群的资源。集群管理器或客户端为每一个作业启动一个集群,集群只能运行该作业。

  • Flink Session集群

集群的生命周期与作业无关。用户可向集群提交多个作业,所以资源的隔离性较差,存在资源竞争的情况。适合执行时间段、对稳定性要求不高的常见。

首先,现阶段七析BI的批作业都是以小数据量为主,执行时间在3分钟以内。其次,每一个作业对失败的容忍度较高(失败时不会影响用户使用,只是数据相对旧且可重试)。

Application模式和Job模式都需要为每个作业启动一个独立的集群执行。虽然资源的隔离性更高了,但时间和机器资源的花销更大了,显然是不合适的。

而使用Session模式时,集群启动时不用跟用户代码绑定,客户端可以不断地向集群提交不同的作业。由于不同的作业可以共享集群的资源,所以只需要有一个高可用的集群即可满足生产使用。运维成本和机器资源占用相对其他方式而言都是最低的。

所以,综合以上几点,Session模式是最适合我们的模式。

Flink集群资源提供者的选择

Flink的资源提供者有以下几种:

  • Standalone

Standalone集群部署、扩容、恢复和升级完全依赖人工,对运维的压力较大,不太适合大规模的生产化部署。

  • YARN

YARN模式通过YARN作为Flink的资源管理和调度程序,可以实现自动化资源管理、弹性伸缩和自动恢复等功能。适合团队本身有使用YARN的背景和大规模的生产化部署。

  • Native Kubernetes

Flink集成了Kubernetes的API,可以通过Flink脚本将集群部署到已有的Kubernetes集群上。有了Kubernetes,高可用、自动化资源管理等能力实现起来也非常方便。适合有Kubernetes和Flink经验的团队,也是官方推荐的部署方式一。

  • Flink Kubernetes Operator

Flink社区提供的独立项目,可通过operator以云原生的方式管理Flink集群。可自动处理Flink集群的部署、资源伸缩、升级,以及蓝绿发布、滚动升级等高级功能,是未来的趋势。适合有Kubernetes经验,已经进行大规模生产化部署的团队。

结合使用规模和运维成本,七析BI选择的是Native Kubernetes的方式。它部署简单、自动化程度高,在我们的场景下用起来还是挺方便的。

详细的部署步骤这里就不多说了,可以直接参考Flink官方文档:

Native Kubernetes | Apache Flink

Flink作业的提交

Flink提交作业的方式大部分都是在集群启动时指定的userJar(流作业)、使用Flink SQL或者通过Flink的本地程序提交。而七析BI都是通过后端服务触发作业提交,如果可以通过http的方式向Flink提交作业,那对客户端来说是最方便的。

Flink提供了一系列用于监控和管理的REST API,包括提交作业和查询作业运行结果。

Flink作业项目开发

  • 通过maven archetype创建项目
mvn archetype:generate                \
  -DarchetypeGroupId=org.apache.flink   \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.20.0

这允许你命名新建的项目,而且会交互式地询问 groupId、artifactId、package 的名字。

  • 项目依赖

创建好的项目的pom文件中已经包含了最基本的Flink Streaming API程序的依赖和打包插件。通常来说,你还需要添加这些依赖:

  1. 其他Flink API | 如Flink Table API
  2. 连接器 | 用于与其他数据源进行数据读写。参考:连接器和格式 | Apache Flink
  3. 单元测试

Flink API依赖

可以参考一下表格选择你要使用的API,添加到你的项目依赖中:

你要使用的 API

你需要添加的依赖项

DataStream

flink-streaming-java

DataStream Scala 版

flink-streaming-scala_2.12

Table API

flink-table-api-java

Table API Scala 版

flink-table-api-scala_2.12

Table API + DataStream

flink-table-api-java-bridge

Table API + DataStream Scala版

flink-table-api-scala-bridge_2.12

user jar准备

通过maven archetype创建的项目中包含了maven-shade-plugin,它的作用是将项目代码和其他依赖打包成一个fat jar。有了这个fat jar后,你就可以将它上传到集群上并执行你的作业了。

存在的问题

在高可用配置下,这个fat jar在每次作业提交时都会被Flink重复上传到文件系统上。这就造成了很多不必要的时间和带宽的花销,所以需要对user jar做一个优化。

user jar拆分

Flink自己有一个lib目录用来放置自身的java代码,我们可以利用这个目录来放置一些我们fat jar中的依赖。

第一步是将原来的fat jar拆分成一个thin jar和lib jar,thin jar是我们的flink程序代码,lib jar是我们程序的其他依赖。

第二,像原来一样,thin jar依然上传到Flink;而lib jar就放到Flink的lib路径。

user jar拆分前后对比

最后,我们需要添加一个maven-jar-plugin用于构建我们的thin jar;原来的maven-shade-plugin用于构造我们lib jar(需要把user code排除掉)。

如下所示:

<!-- 构建 thin jar-->
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.2.0</version>
    <configuration>
        <archive>
            <manifest>                                                                    <mainClass>com.App</mainClass>
            </manifest>
        </archive>
    </configuration>
</plugin>

<!-- 构建 lib jar-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<!-- 构造所有依赖包放到flink的lib目录下-->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<id>flink-lib-jar</id>                                
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>true</shadedArtifactAttached>
<finalName>flink-app-lib</finalName>
<artifactSet>
<excludes>
 <!--flink 用到的依赖,按需调整-->
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.apache.flink:*</exclude>
<!-- 排除当前工程 -->
<exclude>com.groupId:artifactId</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>

</configuration>
</execution>
</executions>
</plugin>

user jar上传

上面构建出的thin jar有两种方式上传到Flink集群:

  • 通过Flink监控页面的上传jar功能
  • 之前把jar包放置在flink保存上传jar的目录下

而lib jar则直接放置到flink的lib目录下。

* 注意:每次更新lib都需要更新镜像并重启。

作业提交

通过http的方式调用Flink的作业运行接口:

/jars/:jarid/run
{
    "programArgsList":["参数1", "参数2"]
}

jarid是Flink上传目录中对应jar的文件名或者在Flink监控页面查看,如下图:

programArgsList参数放置在request body中,可作为我们程序main方法的入参。

到这里,我们就成功地将我们批作业提交到Flink集群了。

道一云七巧-与你在技术领域共同成长

更多技术知识分享:https://bbs.qiqiao668.com/


http://www.kler.cn/a/449787.html

相关文章:

  • windows下安装配置anaconda及常用的conda命令
  • C++设计模式:享元模式 (附文字处理系统中的字符对象案例)
  • 大模型(LLM)提示工程(Prompt Engineering)初识
  • [Unity]Unity集成NuGet-连接mysql时的发现
  • shardingsphere分库分表项目实践1-让shardingsphere运行起来
  • unity Toggle制作滑动开关
  • 【源码阅读系列】(六) Android 中的进程和线程
  • kubevirt网络
  • Jmeter测试脚本编写技巧
  • 从零开始学前端之HTML(三)
  • 咸虾米壁纸微信小程序下载图片到相册saveImageToPhotosAlbum功能修改
  • PLSQL 客户端连接 Oracle 数据库配置
  • 算法day_3数组中的单一元素和二进制位颠倒
  • autMan奥特曼机器人-相关命令
  • 【漏洞复现】F5 BIG-IP Next Central Manager SQL注入漏洞(CVE-2024-26026)
  • (10)YOLOv8算法基本原理
  • EasyPlayer.js播放器在React项目中应如何使用?
  • Jenkins Api Token 访问问题
  • MySQL 数据备份与恢复详解
  • 压缩为zip和gzip工具类
  • MySQL快速扫描
  • ios按键精灵脚本开发:ios悬浮窗命令
  • PHP中替换某个包或某个类
  • Linux 软硬链接详解:深入理解与实践
  • Ubuntu下ESP32-IDF开发环境搭建
  • C++ 虚函数、虚函数表、静态绑定与动态绑定笔记