当前位置: 首页 > article >正文

Flink SQL Client 安装各类 Connector、Format 组件的方法汇总(持续更新中....)

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

一般来说,在 Flink SQL Client 中使用各种 Connector 只需要该 Connector 及其依赖 Jar 包部署到 ${FLINK_HOME}/lib 下即可。但是对于某些特定的平台,如果 AWS EMR、Cloudera CDP 等产品会有所不同,主要是它们中的某些 Jar 包可能被改写过,例如和 Hive Metastore 的交互,AWS EMR 就有另外一套 Metatstore:Glue Data Catalog,所以接口也做了相应的,所以,简单的复制开源的 Jar 包可能会有问题,最好做法还是从该平台/产品的集群上拷贝本地的 Jar 包。

以下脚本,以 EMR 6.15 ( Flink 1.17.1)为例,展示了各类常用 Connector 的安装方法,有的是直接下载自开源社区,有的则是从 EMR 集群本地找到相应 Jar 包安装的。脚本在 EMR 6.15 上全部测试通过,如果在其他版本的 EMR 或 Flink 上安装,请注意修改版本号。

FLINK_VERSION="1.17.1"
FLINK_MAJOR_VERSION="1.17"
HUDI_VERSION="0.14.0"
SCALA_MAJOR_VERSION="2.12"

安装大量的 Connector 可能会导致 Jar 包冲突,因此,建议做好如下两项准备工作:

1. 安装新的 Connector 或依赖包时,提前备份一版当前的 lib 库

sudo -u flink cp -r /usr/lib/flink/lib /usr/lib/flink/lib.$(date +'%Y%m%d%H%M').bak

2. 为了解决版本冲突,可以充分 Maven 的依赖解析能力,将需要同时安装的 Connector 的 Maven 依赖整合在一起,去 https://jar-download.com/online-maven-download-tool.php 一次性下载解析好的完整依赖包

另外,要特别提醒的是: 安装完毕后,务必重启新的 Yarn Session 方能生效。以下是单独安装各个常用 Connector、组件的方法:

1. Flink SQL Kafka Connector


```bash # install flink kafka connector for flink sql client # only run on master node is enough, owner of flink home dir is 'flink' user sudo -u flink wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/${FLINK_VERSION}/flink-sql-connector-kafka-${FLINK_VERSION}.jar -P /usr/lib/flink/lib/ ```

2. Flink Hudi Connector


```bash # install flink hudi connector for flink sql client # only run on master node is enough, owner of flink home dir is 'flink' user sudo -u flink wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink${FLINK_MAJOR_VERSION}-bundle/${HUDI_VERSION}/hudi-flink${FLINK_MAJOR_VERSION}-bundle-${HUDI_VERSION}.jar -P /usr/lib/flink/lib/ ``` # 3. Flink Hive Connector ( on AWS EMR )
如果 EMR 版本不是 6.15, 请注意替换以下 jar 包文件中的版本号,以所用 EMR 集群上的文件版本为准:
# install flink hive connector for flink sql client
# only run on master node is enough, owner of flink home dir is 'flink' user
# refer to this doc: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-configure.html
sudo -u flink cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
sudo -u flink cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /usr/lib/flink/lib 
sudo -u flink cp /usr/lib/hive/lib/libfb303-0.9.3.jar /usr/lib/flink/lib 
sudo -u flink cp /usr/lib/flink/opt/flink-connector-hive_${SCALA_MAJOR_VERSION}-${FLINK_VERSION}-amzn-1.jar /usr/lib/flink/lib

4. Debezium Confluent Avro 格式 (‘format’ = ‘debezium-avro-confluent’)


前往 [https://jar-download.com/online-maven-download-tool.php](https://jar-download.com/online-maven-download-tool.php) ,输入如下 Maven 依赖(**注意:如有其他同方式获取Jar包的组件,请合并为一份xml配置统一提交,避免出现 Jar 包版本冲突**):
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.18.1</version>
</dependency>

点击 “Submit” 按钮,将 flink-avro-confluent-registry 及其依赖包下载到本地,然后将得到 jar_files.zip 包上传到集群主节点,并执行以下命令将 jar 包部署到 Flink SQL Client 的 lib 目录下:

# install flink 'debezium-avro-confluent' format for flink sql client
# only run on master node is enough, owner of flink home dir is 'flink' user
# refer to this doc: https://blog.csdn.net/bluishglc/article/details/135863249 , section 3.2
sudo -u flink unzip jar_files.zip -d /usr/lib/flink/lib/

更多详细介绍请参考《Flink 集成 Debezium Confluent Avro ( format=debezium-avro-confluent )》 一文的 3.2 节。

5. Flink JDBC Connector for MySQL


需要同时安装 flink-connector-jdbc 的 Jar 包和 MySQL 的 JDBC 驱动 Jar 包。
# install flink jdbc connector for flink sql client, note: flink-connector-jdbc_2.12-1.14.6.jar is wrong jar!!
sudo -u flink wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-${FLINK_MAJOR_VERSION}/flink-connector-jdbc-3.1.1-${FLINK_MAJOR_VERSION}.jar -P /usr/lib/flink/lib/


# install mysql jdbc driver 8.3.0
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.3.0.zip -P /tmp/
unzip /tmp/mysql-connector-j-8.3.0.zip -d /tmp/
sudo -u flink cp /tmp/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar /usr/lib/flink/lib
ls /usr/lib/flink/lib/mysql-connector-j-8.3.0.jar

# install mysql jdbc driver 5.1.49
# wget https://cdn.mysql.com/archives/mysql-connector-java-5.1/mysql-connector-java-5.1.49.zip -P /tmp/
# unzip /tmp/mysql-connector-java-5.1.49.zip -d /tmp/
# sudo -u flink cp /tmp/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49.jar /usr/lib/flink/lib

6. Flink MySQL CDC Connector (2.3.0)


Flink CDC 2.3.0 在官方Repo: [https://github.com/ververica/flink-cdc-connectors/tags](https://github.com/ververica/flink-cdc-connectors/tags) 上提供了 Uber Jar 供直接下载使用:
# install flink kafka connector for flink sql client
# only run on master node is enough, owner of flink home dir is 'flink' user
sudo -u flink wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar -P /usr/lib/flink/lib/

7. Flink MySQL CDC Connector (2.4+)


**注意:目前 Flink CDC 2.4+ 在官方Repo:[https://github.com/ververica/flink-cdc-connectors/tags](https://github.com/ververica/flink-cdc-connectors/tags) 上尚未提供制作好的 Uber Jar**,如果前往 [https://jar-download.com/online-maven-download-tool.php](https://jar-download.com/online-maven-download-tool.php) 自行制作 Jar 包 + 依赖包,部署后,会报如下错误:

[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.(Lio/debezium/config/Configuration;Ljava/util/Properties;)V

该问题的解释和解决方法参见:https://github.com/ververica/flink-cdc-connectors/issues/2423,鉴于自行重新编译并构建 Uber 包较为繁琐,可先使用 2.3.0 版本,该问题未来会修复。

8. Flink Changlog 格式 (‘format’ = ‘changelog-json’)

sudo -u flink wget https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.1.1/flink-format-changelog-json-2.1.1.jar -P /usr/lib/flink/lib/

安装后,重启 sql client 可能会报:java.lang.ClassNotFoundException: org.apache.flink.formats.json.JsonOptions 错误,应该是却少依赖包,由于后来并没有真正使用到该格式,所以此文暂时搁置。

9. Table Planner 和 Table Planner 加载器


从 Flink 1.15 开始,发行版包含两个 planner: `flink-table-planner ` 和 `flink-table-planner-loader`。这两个 planner JAR 文件的代码功能相同,但打包方式不同。若使用第一个文件,您必须使用与其相同版本的 Scala;若使用第二个,由于 Scala 已经被打包进该文件里,您不需要考虑 Scala 版本问题。

所以,有时候,我们需要在这两个 Planner 之间进行切换,以下脚本可以完成相互的切换工作:

# install flink-table-planner ( can only choose 1 between flink-table-planner & flink-table-planner-loader )
sudo -u flink mv /usr/lib/flink/lib/flink-table-planner-loader-${FLINK_VERSION}-amzn-1.jar /usr/lib/flink/lib/flink-table-planner-loader-${FLINK_VERSION}-amzn-1.jar.bak
sudo -u flink cp /usr/lib/flink/opt/flink-table-planner_${SCALA_MAJOR_VERSION}-${FLINK_VERSION}-amzn-1.jar /usr/lib/flink/lib

# revert to flink-table-planner-loader ( can only choose 1 between flink-table-planner & flink-table-planner-loader ) 
sudo -u flink rm -f /usr/lib/flink/opt/flink-table-planner_${SCALA_MAJOR_VERSION}-${FLINK_VERSION}-amzn-1.jar
sudo -u flink mv /usr/lib/flink/lib/flink-table-planner-loader-${FLINK_VERSION}-amzn-1.jar.bak /usr/lib/flink/lib/flink-table-planner-loader-${FLINK_VERSION}-amzn-1.jar

常见问题


1. [ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.(Lio/debezium/config/Configuration;Ljava/util/Properties;)V

该问题的解释和解决方法参见:https://github.com/ververica/flink-cdc-connectors/issues/2423,鉴于自行重新编译并构建 Uber 包较为繁琐,可先使用 2.3.0 版本,该问题未来会修复。


http://www.kler.cn/a/229813.html

相关文章:

  • Qt C++读写NFC标签NDEF网址URI
  • c++入门之 命名空间与输入输出
  • R语言在森林生态研究中的魔法:结构、功能与稳定性分析——发现数据背后的生态故事!
  • Vivado中Tri_mode_ethernet_mac的时序约束、分析、调整——(一)时序约束的基本概念
  • Unity2D初级背包设计后篇 拓展举例与不足分析
  • vue3如何使用bus(事件总线)
  • 数据结构:单链表
  • LeetCode 0292.Nim 游戏:脑筋急转弯
  • 【经典例子】Java实现2048小游戏(附带源码)
  • 【自然语言处理-工具篇】spaCy<1>--介绍及安装指南
  • 8个国外顶尖设计网站,设计师必备!
  • re:从0开始的CSS学习之路 2. 选择器超长大合集
  • Java锁到底是个什么东西
  • 92.网游逆向分析与插件开发-游戏窗口化助手-显示游戏数据到小助手UI
  • 12. onnx转为rknn测试时有很多重叠框的修改(python)
  • Bug地狱 #1 突然宕机,企业级应用到底怎么了
  • Android 无操作之后定时退出
  • 代码随想录算法训练营第二八天 | 分割 子集
  • Python 调用 OpenAI ChatGPT API
  • leetcode-top100链表专题二
  • Django通过Json配置文件分配多个定时任务
  • 比较两次从接口获取的数据,并找出变动的字段
  • 071:vue中过滤器filters的使用方法(图文示例)
  • Z函数的原理和应用:以Python为例
  • 微信自动预约小程序开发指南:从小白到专家
  • HiSilicon352 android9.0 开机视频调试分析