Flink实战教程从入门到精通(基础篇)(三)Flink集群部署
一、集群角色
Flink提交作业和执行任务,需要几个关键组件:
客户端(Client):代码由客户端获取并做转换,之后提交给JobManger。
JobManager就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。
TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的。
注意:Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。所以接下来我们会先做一个简单的介绍:让大家有一个初步的认识,之后再展开讲述不同情形下的Flink部署。
二、集群搭建
1、集群规划
节点服务器 | node-1 | node-2 | node-3 |
角色 | JobManager TaskManager | TaskManager | TaskManager |
具体安装部署如下:
2、下载并减压安装包
2.1下载并解压安装包
(1)下载安装包 flink-1.17.0-bin-scala 2.12.tg2,将该jar包上传到 node-1节点服务器的/opt/software路径上。
(2)在/opt/software 路径上解压 flink-1.17.0-bin-scala 2.12.tgz 到/opt/module 路径上。
可以使用命令 tar -zxvf flink-1.17.0-bin-scala 2.12.tgz -C /opt/module,从而解压到/opt/module路径上。
2.2 修改集群配置
(1)进入conf路径,修改flink-confyaml文件,指定node-1节点服务器为JobManager
修改内容如下:
# JobManager节点地址.
jobmanager.rpc.address: node-1
jobmanager.bind-host: 0.0.0.0
rest.address: node-1
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node-1
(2)修改workers文件,指定node-1、node-2和node-3为TaskManager
修改内容如下:
修改如下内容:
node-1
node-2
node-3
(3)修改masters文件
root@node-1:/opt/module/flink-1.17.0/conf# vim masters
修改内容如下:
node-1:8081
(4)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:
jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。
taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
关于Slot和并行度的概念,我们会在下一章做详细讲解。
3、分发安装目录
(1)配置修改完毕后,将Flink安装目录发给另外两个节点服务器。
(2)修改node-2节点的conf/flink-conf.yaml中的 taskmanager.host
修改内容如下:
# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: node-2
(3)修改node-3节点的conf/flink-conf.yaml中的 taskmanager.host
修改内容如下:
# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: node-3
4、启动集群
(1)在node-1节点服务器上执行start-cluster.sh启动Flink集群:
root@node-1:/opt/module/flink-1.17.0/bin# ./start-cluster.sh
(2)启动成功:
root@node-1:/opt/module/flink-1.17.0/bin# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node-1.
Starting taskexecutor daemon on host node-1.
Starting taskexecutor daemon on host node-2.
Starting taskexecutor daemon on host node-3.
root@node-1:/opt/module/flink-1.17.0/bin#
注意:
1、连接名:如果设置连接名为node-1、node-2、node-3,需要在三台服务器的/etc/hosts 文件中修改配置:三台服务器都需要配置
root@node-2:/opt/module/flink-1.17.0/conf# cat /etc/hosts
127.0.0.1 localhost
192.168.50.55 node-1
192.168.50.35 node-2
192.168.50.75 node-3
# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
root@node-2:/opt/module/flink-1.17.0/conf#
2、脚本权限:需要对/opt/module/flink-1.17.0/bin下面的所有脚本设置为最高权限,防止集群启动脚本权限不足,无法启动。三台服务器都需要配置。
5、访问Web UI
启动成功后,同样可以访问http://hadoop102:8081http://192.168.50.55:8081http://hadoop102:8081对flink集群和任务进行监控管理。
这里可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。
三、向集群提交作业
在上一章中,我们已经编写读取socket发送的单词并统计单词的个数程序案例。本节我们将以该程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。
1、环境准备
在node-1中执行以下命令启动netcat
root@node-1:~# nc -lk 7777
2、程序打包
(1)在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,具体如下:
注意:cn.konne.un.WordCountUn,这个main方法包
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>cn.konne.un.WordCountUn</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(2)插件配置完毕后,可以使用IDEA的Maven工具执行package命令,出现如下提示即表示打包成功。
NFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.579 s
[INFO] Finished at: 2025-03-21T21:25:26+08:00
[INFO] Final Memory: 67M/763M
[INFO] ------------------------------------------------------------------------
Process finished with exit code 0
3、在Web UI上提交作业
(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如下图所示。
jar包完成后,如下图所示:
(2)点击该JAR包,出现任务配置页面,进行相应配置。
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。
(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。
(4)测试
服务器node-1,执行命令nc -lk 7777,启动socket端口:
root@node-1:~# nc -lk 7777
hello
点击print,查看输出的节点,然后点击TaskManager,可以看到是node-1执行了程序,点击More查看具体日志。如下图:
自动跳转至节点执行日志,如下图:
点击标准输出 stdount,就可以查看标准日志,如下图:
(5)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行。
4、命令行提交作业
除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.17.0下
(1)首先需要启动集群。
root@node-1:/opt/module/flink-1.17.0/bin# bin/start-cluster.sh
(2)在node-1中执行以下命令启动netcat。
root@node-1:~# nc -lk 7777
(3)将flink程序运行jar包上传到/opt/module/flink-1.17.0路径。
(4)进入到flink的安装路径下,在命令行使用flink run命令提交作业。
bin/flink run -m node-1:8081 -c cn.konne.un.WordCountUn ./Flink-First-Demo-1.0-SNAPSHOT.jar
这里的参数 -m指定了提交到的JobManager,-c指定了入口类。
(5)在浏览器中打开Web UI,http://node-1:8081 查看应用执行情况。
用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果。