当前位置: 首页 > article >正文

FlinkCDC适配KADB失败实践

  • 测试目的

源端mysql,使用flink的CDC,实施同步mysql的数据至KADB,验证flinkcdc对kadb的支持

  • 测试环境

Flink:flink-1.19.1

FlinkCDC:flink-cdc-3.3.0

Mysql:5.7

Kadb:V8R3

  • 部署:
  1. Flink:

解压

tar -xzf flink-*.tgz

设置环境变量:

export FLINK_HOME=/path/flink-*

修改$FLINK_HOME/conf/config.yaml文件

address: 0.0.0.0

bind-address: 0.0.0.0

下载cdc connector及postgresql的jdbc驱动jar包,放到$FLINK_HOME/lib目录

flink-connector-jdbc-3.1.0-1.17.jar:flink的jdbc连接器

flink-sql-connector-mysql-cdc-2.4.0.jar:源端为mysql,使用flink的mysql cdc驱动

postgresql-42.7.5.jar:使用jdbc连接kadb,此为kadb的jdbc驱动

启动standlone模式的flink

$FLINK_HOME/bin/start-cluster.sh

浏览器查看flink的dashboard

  1. FlinkCDC

解压:

tar -xzf flink-cdc-*.tar.gz
  • 数据准备

Mysql创建测试数据库、测试表,并插入基础数据

CREATE DATABASE test;

USE test;

-- 创建一张产品表,并写入一些数据

CREATE TABLE products (

                          id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

                          name VARCHAR(255) NOT NULL,

                          description VARCHAR(512)

) AUTO_INCREMENT = 101;

INSERT INTO products

VALUES (default,"scooter","Small 2-wheel scooter"),

       (default,"car battery","12V car battery"),

       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),

       (default,"hammer","12oz carpenter's hammer"),

       (default,"hammer","14oz carpenter's hammer"),

       (default,"hammer","16oz carpenter's hammer"),

       (default,"rocks","box of assorted rocks"),

       (default,"jacket","water resistent black wind breaker"),

       (default,"spare tire","24 inch spare tire");

-- 创建一张订单表,并写入一些数据

CREATE TABLE orders (

                        order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

                        order_date DATETIME NOT NULL,

                        customer_name VARCHAR(255) NOT NULL,

                        price DECIMAL(10, 5) NOT NULL,

                        product_id INTEGER NOT NULL,

                        order_status BOOLEAN NOT NULL -- Whether order has been placed

) AUTO_INCREMENT = 10001;

INSERT INTO orders

VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),

       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),

       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

KADB创建测试数据库test、测试表用于接收同步的数据

test=# \d enriched_orders

                Table "public.enriched_orders"

       Column        |            Type             | Modifiers

---------------------+-----------------------------+-----------

 order_id            | integer                     | not null

 order_date          | timestamp without time zone |

 customer_name       | character varying(100)      |

 price               | numeric(10,5)               |

 product_id          | integer                     |

 order_status        | boolean                     |

 product_name        | character varying(100)      |

 product_description | character varying(100)      |

Indexes:

    "enriched_orders_pkey" PRIMARY KEY, btree (order_id)

Distributed by: (order_id)

  1. Mysql

打开bin-log

在/etc/my.cnf文件添加下面内容,重启mysql:systemctl restart mysqld

log-bin=/var/lib/mysql/mysql-bin

server-id=1

  • Sql-client启动、配置
$FLINK_HOME/bin/sql-client.sh
- 设置间隔时间为3                       
Flink SQL> SET execution.checkpointing.interval = 3s;

CREATE TABLE orders (           //sql-client里面的表,映射到mysql的orders表

   order_id INT,

   order_date TIMESTAMP(0),

   customer_name STRING,

   price DECIMAL(10, 5),

   product_id INT,

   order_status BOOLEAN,

   PRIMARY KEY (order_id) NOT ENFORCED

 ) WITH (

    'connector' = 'mysql-cdc',      //指定mysql cdc连接器名称

         'hostname' = '192.168.85.x',

         'port' = '3306',

         'username' = 'root',

         'password' = 'zhuyong',

         'database-name' = 'test',

         'table-name' = 'orders'            //mysql里面需要数据同步的源表

 );

CREATE TABLE products (

id INT,

name STRING,

description STRING,

PRIMARY KEY (id) NOT ENFORCED

) with (

'connector'='mysql-cdc',

'hostname' = '127.0.0.1',

'port' = '3306',

'username' = 'root',

'password' = 'zhuyong',

'database-name' = 'test',

'table-name' = 'products'

);

建表注意这个报错

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.sql.parser.error.SqlValidateException: Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode

源表测试访问:

Sql-client里面执行:select * from orders;

CREATE TABLE enriched_orders (

   order_id INT,

   order_date TIMESTAMP,

   customer_name STRING,

   price DECIMAL(10, 5),

   product_id INT,

   order_status BOOLEAN,

   product_name STRING,

   product_description STRING,

   PRIMARY KEY (order_id) NOT ENFORCED

 ) WITH (

     'connector' = 'jdbc',      //使用jdbc connector连接KADB,指定连接器名称

     'url' = 'jdbc:postgresql://192.168.85.133:5432/test',

     'username' = 'mppadmin',

          'password'='123456',

          'table-name' = 'enriched_orders'                  //目标端的表必须提前创建

 );

验证连接正常:select * from enriched_orders

  • Sql-client提交数据同步作业到flink

INSERT INTO enriched_orders

  SELECT o.order_id,

    o.order_date,

    o.customer_name,

    o.price,

    o.product_id,

    o.order_status,

    p.name,

    p.description

 FROM orders AS o

 LEFT JOIN products AS p ON o.product_id = p.id;

浏览器里面:

  • 验证数据同步

数据没有同步到KADB

日志报错:

2025-02-08 13:23:16.016699 CST,"mppadmin","test",p125126,th-574695296,"192.168.85.133","58800",2025-02-08 13:23:11 CST,0,con33,cmd2,seg-1,,,,sx1,"ERROR","42601","syntax error at or near ""ON""",,,,,,"INSERT INTO enriched_orders(order_id, order_date, customer_name, price, product_id, order_status, product_name, product_description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (order_id) DO UPDATE SET order_id=EXCLUDED.order_id, order_date=EXCLUDED.order_date, customer_name=EXCLUDED.customer_name, price=EXCLUDED.price, product_id=EXCLUDED.product_id, order_status=EXCLUDED.order_status, product_name=EXCLUDED.product_name, product_description=EXCLUDED.product_description",174,,"scan.l",1067,

2025-02-08 13:23:16.016753 CST,"mppadmin","test",p125126,th-574695296,"192.168.85.133","58800",2025-02-08 13:23:11 CST,0,con33,cmd2,seg-1,,,,sx1,"LOG","00000","An exception was encountered during the execution of statement: INSERT INTO enriched_orders(order_id, order_date, customer_name, price, product_id, order_status, product_name, product_description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (order_id) DO UPDATE SET order_id=EXCLUDED.order_id, order_date=EXCLUDED.order_date, customer_name=EXCLUDED.customer_name, price=EXCLUDED.price, product_id=EXCLUDED.product_id, order_status=EXCLUDED.order_status, product_name=EXCLUDED.product_name, product_description=EXCLUDED.product_description",,,,,,"INSERT INTO enriched_orders(order_id, order_date, customer_name, price, product_id, order_status, product_name, product_description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (order_id) DO UPDATE SET order_id=EXCLUDED.order_id, order_date=EXCLUDED.order_date, customer_name=EXCLUDED.customer_name, price=EXCLUDED.price, product_id=EXCLUDED.product_id, order_status=EXCLUDED.order_status, product_name=EXCLUDED.product_name, product_description=EXCLUDED.product_description",0,,"postgres.c",5021,

这里的insert on confict语法KADB不支持,导致数据同步失败。

  • 测试结论
  1. Insert on conflict语法在pg9.5版本引入,它允许你在插入数据时指定一个条件:如果满足该条件的记录已经存在,则更新该记录;否则,插入一条新记录。这种操作在处理重复数据或需要根据现有数据进行动态更新的情况下非常有用。从日志看flinkcdc实现数据增量同步的原理是基于上述语法,但KADB目前不支持该语法的upsert功能
  2. 数仓中的目标表数据往往数据量巨大,如果每次(间隔3秒)都要根据主键做upsert操作,其数据同步的效率是否会受到影响


http://www.kler.cn/a/537887.html

相关文章:

  • 深度学习和机器学习的区别|自注意力机制和多头注意力机制的展示|售前面试题
  • CNN 卷积神经网络处理图片任务 | PyTorch 深度学习实战
  • 计算机网络知识速记:TCP 与 UDP
  • 国产化创新 守护开放边界网络安全
  • RabbitMQ 从入门到精通:从工作模式到集群部署实战(四)
  • DeepSeek使用技巧大全(含本地部署教程)
  • 学习 PostgreSQL 流复制
  • 背包问题常见bug
  • Qt—libpng warning: iCCP: known incorrect sRGB profile
  • Linux——网络(http)
  • 绿虫无人机3D光伏设计
  • 解决_ssl.so: cannot open shared object file: No such file or directory
  • 开源像素字体,可用于独立游戏开发
  • 通过k8s请求selfsubjectrulesreviews查询权限
  • Formality:时序变换(五)(寄存器复制)
  • 3. Strategy(策略模式)C++
  • 蓝桥杯真题 - 像素放置 - 题解
  • 【DeepSeek论文翻译】DeepSeek-R1: 通过强化学习激励大型语言模型的推理能力
  • vscode设置保存时自动缩进和格式化
  • UE求职Demo开发日志#25 试试网络同步和尝试打包
  • 全志T527 音频适配
  • Ranger Hive Service连接测试失败问题解决
  • 第 26 场 蓝桥入门赛
  • spring学习(spring-DI简单入门案例)
  • to_csv保存指定列的方法
  • apachePoi中XSSFClientAnchor图片坐标简述;填充多张图片