当前位置: 首页 > article >正文

部署Flink1.20.1

1、设置环境变量

export JAVA_HOME=/cluster/jdk
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jarp
#export HIVE_HOME=/cluster/hive
export MYSQL_HOME=/cluster/mysql
export HADOOP_HOME=/cluster/hadoop3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export FLINK_HOME=/cluster/flink
export SPARK_HOME=/cluster/spark
export ZK_HOME=/cluster/zookeeper
export NACOS_HOME=/cluster/nacos
export KAFKA_HOME=/cluster/kafka
export DATART_HOME=/cluster/datart
export HBASE_HOME=/cluster/hbase
export SEATUNNEL_HOME=/cluster/seatunnel
export STREAMPARK_HOME=/cluster/streampark
export KYUUBI_HOME=/cluster/kyuubi
export DINKY_HOME=/cluster/dinky
export INLONG_HOME=/cluster/inlong
export DORIS_HOME=/cluster/doris
export BE_HOME=$DORIS_HOME/be
export FE_HOME=$DORIS_HOME/fe
export M2_HOME=/cluster/maven
export PATH=$PATH:$M2_HOME/bin:$BE_HOME/bin:$FE_HOME/bin:$DINKY_HOME/bin:$INLONG_HOME/bin:$DATART_HOME/bin:$KYUUBI_HOME/bin:$HBASE_HOME/bin:$SEATUNNEL_HOME/bin:$STREAMPARK_HOME/bin:$FLINK_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/SPARK_HOME:$KAFKA_HOME:$MYSQL_HOME/bin:$HIVE_HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$NACOS_HOME/bin:$ZK_HOME/bin

2、 flink的配置文件config.yaml

env:
  java:
    opts:
      all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
#==============================================================================
# Common
#==============================================================================
# Common
#==============================================================================
jobmanager:
  bind-host: 0.0.0.0
  rpc:
    address: 0.0.0.0
    port: 6123
  memory:
    process:
      size: 1600m
  execution:
    failover-strategy: region
  archive:
    fs:
      dir: hdfs://10.10.10.99:9000/flink/completed-jobs/
taskmanager:
  bind-host: 0.0.0.0
  host: 0.0.0.0
  numberOfTaskSlots: 100
  memory:
    process:
      size: 1728m
    network:
      fraction: 0.1
      min: 64mb
      max: 1gb
parallelism:
  default: 1
fs:
  default-scheme: hdfs://10.10.10.99:9000
#==============================================================================
# High Availability zookeeper没有开启认证,应该尝试下怎么开启zookeeper的认证方式
#==============================================================================
high-availability:
  # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
  type: zookeeper
  # The path where metadata for master recovery is persisted. While ZooKeeper stores
  # the small ground truth for checkpoint and leader election, this location stores
  # the larger objects, like persisted dataflow graphs.
  #
  # Must be a durable file system that is accessible from all nodes
  # (like HDFS, S3, Ceph, nfs, ...)
  storageDir: hdfs:///flink/ha/
  zookeeper:
    # The list of ZooKeeper quorum peers that coordinate the high-availability
    # setup. This must be a list of the form:
    # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
    quorum: localhost:2181
    client:
      # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
      # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
      # The default value is "open" and it can be changed to "creator" if ZK security is enabled
      acl: open
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.
# # Execution checkpointing related parameters. Please refer to CheckpointConfig and CheckpointingOptions for more details.
execution:
  checkpointing:
    interval: 3min
    externalized-checkpoint-retention: DELETE_ON_CANCELLATION
    max-concurrent-checkpoints: 1
    min-pause: 0s
    mode: EXACTLY_ONCE
    timeout: 10min
    tolerable-failed-checkpoints: 0
    unaligned: false
state:
  backend:
    type: hashmap
    incremental: false
  checkpoints:
    dir: hdfs://10.10.10.99:9000/flink/flink-checkpoints
  savepoints:
    dir: hdfs://10.10.10.99:9000/flink/flink-savepoints
#==============================================================================
# Rest & web frontend
#==============================================================================
rest:
  address: 0.0.0.0
  bind-address: 0.0.0.0
  web:
    submit:
      enable: true
    cancel:
      enable: true
#==============================================================================
# Advanced
#==============================================================================  
io:
  tmp:
    dirs: /tmp
classloader:
  resolve:
    order: child-first
#==============================================================================
# Flink Cluster Security Configuration
#==============================================================================
# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
# may be enabled in four steps:
# 1. configure the local krb5.conf file
# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
# 3. make the credentials available to various JAAS login contexts
# 4. configure the connector to use JAAS/SASL
# # The below configure how Kerberos credentials are provided. A keytab will be used instead of
# # a ticket cache if the keytab path and principal are set.
# security:
#   kerberos:
#     login:
#       use-ticket-cache: true
#       keytab: /path/to/kerberos/keytab
#       principal: flink-user
#       # The configuration below defines which JAAS login contexts
#       contexts: Client,KafkaClient
#==============================================================================
# ZK Security Configuration
#==============================================================================
# zookeeper:
#   sasl:
#     # Below configurations are applicable if ZK ensemble is configured for security
#     #
#     # Override below configuration to provide custom ZK service name if configured
#     # zookeeper.sasl.service-name: zookeeper
#     #
#     # The configuration below must match one of the values set in "security.kerberos.login.contexts"
#     login-context-name: Client
#==============================================================================
# HistoryServer
#==============================================================================
historyserver:
  web:
    address: 0.0.0.0
    port: 8082
  archive:
    fs:
      dir: hdfs://10.10.10.99:9000/flink/historyserver/completed-jobs/
      fs.refresh-interval: 10000

3、提交运行 

==========
flink提交命令
方式一、Per-Job 模式

作业提交命令行方式:

失败,不成功,下面问题1修复、问题2报错
/cluster/flink-1.15.0/bin/flink run -t yarn-per-job -d -ynm MysqlFlinkToKafkaUuid -Dyarn.application.name=MysqlFlinkToKafkaUuidYarn -c com.example.cloud.MysqlFlinkToKafkaUuid /cluster/mysql-flink-cdc-to-kafka-uuid-jar-with-dependencies.jar 3000

问题1、出现错误:
Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
    at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2803)
    at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3059)
    at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3018)
    at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2991)
    at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2871)
    at org.apache.hadoop.conf.Configuration.get(Configuration.java:1223)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1835)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
    at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
    at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
    at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
您在 /var/spool/mail/root 中有邮件

解决办法:classloader.check-leaked-classloader: false


问题2:未测试
Caused by: java.io.InvalidClassException: com.fasterxml.jackson.databind.cfg.MapperConfig; incompatible types for field _mapperFeatures
毛毛虫雨

将debezium-connector-mysql依赖中的jackson-databind和jackson-core排除,引入2.13.3版本,然后重新编译打包就可以了(此答案整理自Flink CDC 社区)

/cluster/flink-1.15.0/bin/flink run -t yarn-per-job -d -ynm MysqlFlinkToKafkaUuidPerJob -Dyarn.application.name=MysqlFlinkToKafkaUuidPerJobYarn -c com.example.cloud.MysqlFlinkToKafkaUuidPerJob /cluster/mysql-flink-cdc-to-kafka-uuid-perJob-jar-with-dependencies.jar 3000


当前模式在Flink系统中已处于Deprecated状态,后续不推荐使用。

2、方式二、Session模式

(1)

-jm 2048: 指定 JobManager 的内存大小为 2048MB(即2GB)。
-tm 2048: 指定每个 TaskManager 的内存大小为 2048MB(即2GB)。
-s 2: 指定启动的 TaskManager 的数量为 2。
-z <zk-namespace>: 指定 ZooKeeper 命名空间。这个参数是可选的,用于指定 Flink 集群连接的 ZooKeeper 命名空间。
如果 Flink 集群配置了高可用性(High Availability),则需要指定 ZooKeeper 命名空间。
-nm <app-name>: 指定 YARN 应用的名称。这个参数是可选的,用于指定启动的 YARN 应用的名称。
-d: 启用调试模式。这个参数是可选的,如果指定了 -d,则会启用 Flink 的调试模式,可以用于排查问题。

失败
/cluster/flink-1.15.0/bin/yarn-session.sh -jm 2048 -tm 2048 -s 1 -nm yarn-session-app -d

打印输出id来
application_1715872393630_0002
(2)

/cluster/flink-1.15.0/bin/flink run -c com.example.cloud.MysqlFlinkToKafkaUuid  -yid application_1715872393630_0002 /cluster/mysql-flink-cdc-to-kafka-uuid-jar-with-dependencies.jar 3000

启动成功,flink的jar目录下添加4个jar文件
执行成功,见yarn-session1-5个效果截图


3、方式三、Application模式

启动成功,flink的jar目录下添加4个jar文件

/cluster/flink-1.15.0/bin/flink run-application -t yarn-application -Dparallelism.default=1 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=MysqlFlinkToKafkaUuid -Dtaskmanager.numberOfTaskSlots=1 -c com.example.cloud.MysqlFlinkToKafkaUuid /cluster/mysql-flink-cdc-to-kafka-uuid-jar-with-dependencies.jar 3000

/cluster/flink-1.15.0/bin/flink run-application -t yarn-application -Dparallelism.default=1 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=MysqlFlinkToKafkaId -Dtaskmanager.numberOfTaskSlots=1 -c com.example.cloud.MysqlFlinkToKafkaId /cluster/mysql-flink-cdc-to-kafka-id-jar-with-dependencies.jar 3000
 


http://www.kler.cn/a/564116.html

相关文章:

  • nginx 搭建 IPv6 -> IPv4 反向代理服务器
  • 淘宝商品搜索API实战:Elasticsearch分词与排序算法优化
  • 【数字图像处理三】图像变换与频域处理
  • 矩阵营销的 AI 进化:DeepSeek 如何助力批量运营账号?
  • 使用 Apache Dubbo 释放 DeepSeek R1 的全部潜力
  • Linux nc 命令详解
  • 如何下载MinGW-w64到MATLAB
  • Java集合性能优化面试题
  • 【Linux】CentOS7停服之后配置yum镜像源
  • Ubuntu 下通过 Docker 部署 Nginx 服务器
  • Ubuntu指令(一)
  • 如何优化Redis性能:从理论到实践
  • 苹果折叠屏iPhone突破折痕难题 或将在2026年发布
  • git 常用功能
  • AI快速变现之路,AI视频创作
  • Nacos 服务挂掉时如何恢复配置并访问缓存
  • 机器学习数学基础:31.信度与重测
  • Unity:应用关闭时执行函数方法
  • 如何合理使用Python爬虫按关键字搜索VIP商品:代码示例与实践指南
  • 软件测试八股文,软件测试常见面试合集【附答案】