当前位置: 首页 > article >正文

paimon---同步mysql数据到paimon表中

1.1、mysql源表

CREATE TABLE `mysql_orders` (
  `order_id` varchar(100) NOT NULL,
  `user_id` varchar(100) DEFAULT NULL,
  `amount` decimal(10,2) DEFAULT NULL,
  `update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  PRIMARY KEY (`order_id`)
)

mysql 开启bin_log, 设置ROW

1.2、flink cdc同步mysql数据

参考: https://blog.csdn.net/wuxintdrh/article/details/146165736

CREATE TABLE mysql_cdc_source (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10,2),
    update_time TIMESTAMP(3),
    dt STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'chb1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'paimon_test',
    'table-name' = 'mysql_orders',
    'server-time-zone' = 'Asia/Shanghai'  -- 时区配置(避免时间偏差)
);
select * from mysql_cdc_source;


1.3、同步到paimon

创建paimon表

CREATE TABLE orders (
    order_id STRING PRIMARY KEY NOT ENFORCED,
    user_id STRING,
    amount DECIMAL(10,2),
    update_time TIMESTAMP(3),
	dt STRING
) WITH (
    'merge-engine' = 'deduplicate',       -- 默认去重引擎,保留最新记录
    'changelog-producer' = 'input',       -- 直接存储 CDC 的原始变更日志
    'bucket' = '4',                       -- 分桶优化写入性能
    'snapshot.time-retained' = '7d'       -- 保留 7 天快照
);

同步数据

INSERT INTO paimon_catalog.`default`.orders
SELECT 
    order_id, 
    user_id, 
    amount, 
    update_time, 
    DATE_FORMAT(update_time, 'yyyy-MM-dd') AS dt  -- 动态分区
FROM default_catalog.default_database.mysql_cdc_source;

查询paimon表:

select * from paimon_catalog.`default`.orders;

报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

End of exception on server side

排查发现jobManager资源充足,taskManager slot还有可用,taskManager memory资源偏小,调大资源后运行正常。

二、通过paimon-flink-action同步数据

参考:https://paimon.apache.org/docs/1.0/cdc-ingestion/mysql-cdc/
报错:ClassNotFoundException: org.apache.kafka.connect.errors.ConnectException,引入connect-api-3.2.1.jar

又报错: java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;


http://www.kler.cn/a/583346.html

相关文章:

  • uv python包管理工具
  • [极客大挑战 2019]FinalSQL【SQL布尔盲注】
  • Ubuntu 使用快速入门 | Android Framework
  • 电机控制常见面试问题(九)
  • 用TypeScript和library needle来创建视频爬虫程序
  • 使用AI一步一步实现若依前端(6)
  • 【每日学点HarmonyOS Next知识】路由栈问题、图片圆角、颜色资源转十六进制字符串、数据集变化崩溃、组件声明周期
  • Qt | 屏幕截图实现
  • deepseek的regflow安装mac版本
  • 【反无人机目标检测数据集】MIDGARD:关于基于机器学习的微型无人机视觉相对定位的训练数据集
  • 简述你对 Spring MVC 的理解
  • ubuntu-drivers-common 包功能详解
  • 每天一篇《目标检测》文献(三)
  • Python----数据可视化(Pyecharts一:介绍安装,全局配置,系列配置)
  • Vue 组件通信 - 子传父
  • ctfhub-web-SSRF通过攻略
  • ffmpeg实用技巧:使用ffmpeg命令行从视频文件中提取帧画面并保存为图片
  • 桂链:什么是区块链账本?
  • 为什么 HTTP GET 方法不使用请求体?
  • [笔记.AI]KAG(知识增强生成 Knowledge Augmented Generation)