Flink实战-(6)FlinkSQL实现CDC
FlinkSQL说明
- Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
- 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
- Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。
- 在 flink sql 中,对表名、字段名、函数名等是严格区分大小写的,为了兼容 hive 等其他仓库,建议建表时,表名和字段名都采用下划线连接单词的方式,以避免大小写问题。比如 hive ,是不区分大小写的,所有大写字母最终都会被系统转化为小写字母,此时使用 flink sql 去读写 hive ,出现大写字母时,会出现找不到表或字段的错误。关键字是不区分大小写的,比如 insert、select、create等。flink sql 中所有的字符串常量都需要使用英文单引号括起来,不要使用英文双引号以及中文符号。
前期准备
依赖的环境
环境:Linux(Centos7)
Flink : 1.13.6
进入Flink的lib目录
cd flink-1.13.6/lib
上传相关的依赖包,这几个包在网上很容易找到
flink-sql-connector-mysql-cdc-2.1.0.jar
mysql-connector-java-8.0.13.jar
flink-sql-connector-postgres-cdc-1.2.0.jar
postgresql-42.6.0.jar
启动 Flink客户端
./flink-1.13.1/bin/sql-client.sh
Flink-SQL脚本
1、postgresql ->postgresql
-- pg中映射表,source
CREATE TABLE cdc_pg_source (
id INT,
age INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '10.254.21.3',
'port' = '54432',
'database-name' = 'postgres',
'schema-name' = 'public',
'username' = 'gpadmin',
'password' = 'xxxxxxx',
'table-name' = 'cdc_pg_source',
'decoding.plugin.name' = 'pgoutput',
'debezium.slot.name' = 'cdc_pg_source');
-- pg中映射表,sink
CREATE TABLE cdc_pg_sink (
id INT,
age INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.254.21.3:54432/postgres',
'username' = 'gpadmin',
'password' = 'xxxxxx',
'table-name' = 'cdc_pg_sink',
'sink.buffer-flush.max-rows' = '1');
-- flink job
INSERT INTO cdc_pg_sink select * from cdc_pg_source;
2、mysql -> mysql
CREATE TABLE t_test (
id bigint,
username string,
password string,
create_time time
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.252.92.4',
'port' = '3306',
'database-name' = 'flink_cdc_test',
'username' = 'root',
'password' = 'xxxx',
'table-name' = 't_test'
);
CREATE TABLE t_test_ods (
id bigint primary key,
username string,
password string,
create_time time
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.252.92.4:3306/flink_cdc_test_ods',
'username' = 'root',
'password' = 'xxxx',
'table-name' = 't_test',
'sink.buffer-flush.max-rows' = '1'
);
insert into t_test_ods select * from t_test;
遇到的问题
1、Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. 或 Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
解决方法:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
下载改JAR包,把它加到Flink下的lib路径下,然后重启sql-client;