当前位置: 首页 > article >正文

Flink实战-(6)FlinkSQL实现CDC

FlinkSQL说明

  • Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
  • 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
  • Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。
  • 在 flink sql 中,对表名、字段名、函数名等是严格区分大小写的,为了兼容 hive 等其他仓库,建议建表时,表名和字段名都采用下划线连接单词的方式,以避免大小写问题。比如 hive ,是不区分大小写的,所有大写字母最终都会被系统转化为小写字母,此时使用 flink sql 去读写 hive ,出现大写字母时,会出现找不到表或字段的错误。关键字是不区分大小写的,比如 insert、select、create等。flink sql 中所有的字符串常量都需要使用英文单引号括起来,不要使用英文双引号以及中文符号。

前期准备

依赖的环境

环境:Linux(Centos7)
Flink : 1.13.6

进入Flink的lib目录

cd flink-1.13.6/lib

上传相关的依赖包,这几个包在网上很容易找到

flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar
flink-sql-connector-postgres-cdc-1.2.0.jar
postgresql-42.6.0.jar

启动 Flink客户端

./flink-1.13.1/bin/sql-client.sh

Flink-SQL脚本

1、postgresql ->postgresql

-- pg中映射表,source
CREATE TABLE cdc_pg_source (
 id INT,
 age INT,
 name STRING
) WITH (
 'connector' = 'postgres-cdc',
  'hostname' = '10.254.21.3',
  'port' = '54432',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'username' = 'gpadmin',
  'password' = 'xxxxxxx',
  'table-name' = 'cdc_pg_source',
  'decoding.plugin.name' = 'pgoutput',
  'debezium.slot.name' = 'cdc_pg_source');




-- pg中映射表,sink
CREATE TABLE cdc_pg_sink (
 id INT,
 age INT,
 name STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://10.254.21.3:54432/postgres',
  'username' = 'gpadmin',
  'password' = 'xxxxxx',
  'table-name' = 'cdc_pg_sink',
  'sink.buffer-flush.max-rows' = '1');
 


-- flink job
INSERT INTO cdc_pg_sink select * from cdc_pg_source;

2、mysql -> mysql

CREATE TABLE t_test (
 id bigint,
 username string,
 password string,
 create_time time
) WITH (
 'connector' = 'mysql-cdc',
  'hostname' = '10.252.92.4',
  'port' = '3306',
  'database-name' = 'flink_cdc_test',
  'username' = 'root',
  'password' = 'xxxx',
  'table-name' = 't_test'
);

CREATE TABLE t_test_ods (
 id bigint primary key,
 username string,
 password string,
 create_time time
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://10.252.92.4:3306/flink_cdc_test_ods',
  'username' = 'root',
  'password' = 'xxxx',
  'table-name' = 't_test',
  'sink.buffer-flush.max-rows' = '1'
);

insert into t_test_ods select * from t_test;

遇到的问题

1、Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.  或  Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print

解决方法:

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

下载改JAR包,把它加到Flink下的lib路径下,然后重启sql-client;


http://www.kler.cn/a/15426.html

相关文章:

  • 【算法】【优选算法】前缀和(下)
  • 【windows】05-windows系统级深度隐藏文件方法
  • 计算机组成与原理(2) basic of computer architecture
  • 人工智能与SEO优化中的关键词策略解析
  • [ACTF2020]Upload 1--详细解析
  • 处理namespace问题:Namespace not specified for AGP 8.0.0
  • 冬奥会传统文化管理系统【GUI/Swing+MySQL】(Java课设)
  • DJ4-3 连续分配存储管理方式
  • window 10 安装node.js时遇到2502 2503错误(已解决)
  • 从起步到成熟:探讨APP在不同发展阶段的商业化路径和变现模式
  • MySQL提权/条件竞争漏洞分析和利用(37)
  • Android Jetpack Compose —— 控件
  • Linux上常用的防火墙软件iptables详解
  • Html5版贪吃蛇游戏制作(经典玩法)
  • IT 面试手册 - 序
  • 【KVM虚拟化】· 图形化KVM安装linux
  • 企业数字化转型的核心是什么?如何才能真正做到数字化转型?
  • Go官方指南(一)包、变量、函数
  • Vue3 element-plus el-select 无法选中,又不报错
  • Docker在Windows系统中的安装方法和使用方法
  • 【学习笔记】- 零基础学React
  • Linux命令集(Linux文件管理命令--mv指令篇)
  • 【文心一言】文心一言最近这么火,它到底是什么
  • 算法基础(二)(共有30道例题)
  • 谁说不能用中文写代码?
  • ChatGPT写小论文