flink cdc同步mysql数据
一、api
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- 请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->
<version>3.3-SNAPSHOT</version>
</dependency>
二、sql客户端
客户端只需要添加 flink-sql-connector-mysql-cdc
jar包(注意:和api依赖的包名区分),这里面已经整合了debezium-connector-mysql
模块
下载地址:
配置同步
CREATE TABLE mysql_cdc_source (
order_id STRING,
user_id STRING,
amount DECIMAL(10,2),
update_time TIMESTAMP(3),
dt STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'chb1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'paimon_test',
'table-name' = 'mysql_orders',
'server-time-zone' = 'Asia/Shanghai' -- 时区配置(避免时间偏差)
);
select * from mysql_cdc_source;
参考:https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/