FlinkCDC适配KADB失败实践
- 测试目的
源端mysql,使用flink的CDC,实施同步mysql的数据至KADB,验证flinkcdc对kadb的支持
- 测试环境
Flink:flink-1.19.1
FlinkCDC:flink-cdc-3.3.0
Mysql:5.7
Kadb:V8R3
- 部署:
- Flink:
解压
tar -xzf flink-*.tgz
设置环境变量:
export FLINK_HOME=/path/flink-*
修改$FLINK_HOME/conf/config.yaml文件
address: 0.0.0.0
bind-address: 0.0.0.0
下载cdc connector及postgresql的jdbc驱动jar包,放到$FLINK_HOME/lib目录
flink-connector-jdbc-3.1.0-1.17.jar:flink的jdbc连接器
flink-sql-connector-mysql-cdc-2.4.0.jar:源端为mysql,使用flink的mysql cdc驱动
postgresql-42.7.5.jar:使用jdbc连接kadb,此为kadb的jdbc驱动
启动standlone模式的flink
$FLINK_HOME/bin/start-cluster.sh
浏览器查看flink的dashboard
- FlinkCDC
解压:
tar -xzf flink-cdc-*.tar.gz
- 数据准备
Mysql创建测试数据库、测试表,并插入基础数据
CREATE DATABASE test;
USE test;
-- 创建一张产品表,并写入一些数据
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
) AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
-- 创建一张订单表,并写入一些数据
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
KADB创建测试数据库test、测试表用于接收同步的数据
test=# \d enriched_orders
Table "public.enriched_orders"
Column | Type | Modifiers
---------------------+-----------------------------+-----------
order_id | integer | not null
order_date | timestamp without time zone |
customer_name | character varying(100) |
price | numeric(10,5) |
product_id | integer |
order_status | boolean |
product_name | character varying(100) |
product_description | character varying(100) |
Indexes:
"enriched_orders_pkey" PRIMARY KEY, btree (order_id)
Distributed by: (order_id)
- Mysql
打开bin-log
在/etc/my.cnf文件添加下面内容,重启mysql:systemctl restart mysqld
log-bin=/var/lib/mysql/mysql-bin
server-id=1
- Sql-client启动、配置
$FLINK_HOME/bin/sql-client.sh
- 设置间隔时间为3秒
Flink SQL> SET execution.checkpointing.interval = 3s;
CREATE TABLE orders ( //sql-client里面的表,映射到mysql的orders表
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', //指定mysql cdc连接器名称
'hostname' = '192.168.85.x',
'port' = '3306',
'username' = 'root',
'password' = 'zhuyong',
'database-name' = 'test',
'table-name' = 'orders' //mysql里面需要数据同步的源表
);
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector'='mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'zhuyong',
'database-name' = 'test',
'table-name' = 'products'
);
建表注意这个报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.error.SqlValidateException: Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode
源表测试访问:
Sql-client里面执行:select * from orders;
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP,
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc', //使用jdbc connector连接KADB,指定连接器名称
'url' = 'jdbc:postgresql://192.168.85.133:5432/test',
'username' = 'mppadmin',
'password'='123456',
'table-name' = 'enriched_orders' //目标端的表必须提前创建
);
验证连接正常:select * from enriched_orders
- Sql-client提交数据同步作业到flink
INSERT INTO enriched_orders
SELECT o.order_id,
o.order_date,
o.customer_name,
o.price,
o.product_id,
o.order_status,
p.name,
p.description
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id;
浏览器里面:
- 验证数据同步
数据没有同步到KADB
日志报错:
2025-02-08 13:23:16.016699 CST,"mppadmin","test",p125126,th-574695296,"192.168.85.133","58800",2025-02-08 13:23:11 CST,0,con33,cmd2,seg-1,,,,sx1,"ERROR","42601","syntax error at or near ""ON""",,,,,,"INSERT INTO enriched_orders(order_id, order_date, customer_name, price, product_id, order_status, product_name, product_description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (order_id) DO UPDATE SET order_id=EXCLUDED.order_id, order_date=EXCLUDED.order_date, customer_name=EXCLUDED.customer_name, price=EXCLUDED.price, product_id=EXCLUDED.product_id, order_status=EXCLUDED.order_status, product_name=EXCLUDED.product_name, product_description=EXCLUDED.product_description",174,,"scan.l",1067,
2025-02-08 13:23:16.016753 CST,"mppadmin","test",p125126,th-574695296,"192.168.85.133","58800",2025-02-08 13:23:11 CST,0,con33,cmd2,seg-1,,,,sx1,"LOG","00000","An exception was encountered during the execution of statement: INSERT INTO enriched_orders(order_id, order_date, customer_name, price, product_id, order_status, product_name, product_description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (order_id) DO UPDATE SET order_id=EXCLUDED.order_id, order_date=EXCLUDED.order_date, customer_name=EXCLUDED.customer_name, price=EXCLUDED.price, product_id=EXCLUDED.product_id, order_status=EXCLUDED.order_status, product_name=EXCLUDED.product_name, product_description=EXCLUDED.product_description",,,,,,"INSERT INTO enriched_orders(order_id, order_date, customer_name, price, product_id, order_status, product_name, product_description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (order_id) DO UPDATE SET order_id=EXCLUDED.order_id, order_date=EXCLUDED.order_date, customer_name=EXCLUDED.customer_name, price=EXCLUDED.price, product_id=EXCLUDED.product_id, order_status=EXCLUDED.order_status, product_name=EXCLUDED.product_name, product_description=EXCLUDED.product_description",0,,"postgres.c",5021,
这里的insert on confict语法KADB不支持,导致数据同步失败。
- 测试结论
- Insert on conflict语法在pg9.5版本引入,它允许你在插入数据时指定一个条件:如果满足该条件的记录已经存在,则更新该记录;否则,插入一条新记录。这种操作在处理重复数据或需要根据现有数据进行动态更新的情况下非常有用。从日志看flinkcdc实现数据增量同步的原理是基于上述语法,但KADB目前不支持该语法的upsert功能
- 数仓中的目标表数据往往数据量巨大,如果每次(间隔3秒)都要根据主键做upsert操作,其数据同步的效率是否会受到影响