当前位置: 首页 > article >正文

tidb实时同步到mysql

客户要求实时同步表的数据到mysql,但这个表在tidb。
测试直接通过tidb cdc写入到mysql,有些字段是null,所以中间加了一个kafka实现

客户库中创建表
CREATE TABLE tb_1 (
    id bigint primary key,
    cid bigint,
    gid bigint,
    fee DECIMAL(10,2),
    created_at timestamp,
    type smallint,
    remark string ,
    key i_cid(cid)
);
tidb配置cdc,写入到kafka

1、tidb添加cdc组件
2、配置cdc任务
cat your.toml

case-sensitive = true
enable-old-value = true
[filter]
rules = ['db.tb_1']

3、启动任务

tiup ctl:v5.3.0 cdc changefeed create --pd=http://pd-ip:2379 \
--sink-uri="kafka://kafka-ip:9092/your-topic?kafka-version=1.1.1&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json" --changefeed-id="my" --sort-engine="unified" \
--start-ts=453870757254529193 --config your.toml

protocol=canal-json 使用这个格式
–start-ts这个通过一次导出查看,cat db/metadata

dumpling -u root -p pwd -h tidb-ip -P 3306  -F 1GiB --compress gzip -t 2 -o db -B db -T db.tb --where "cid=123456"

4、更新一条数据,看看kafka是不是有了

创建SQL作业,从kafka消费后入mysql
CREATE TABLE tb_1 (
    id bigint,
    cid bigint,
    gid bigint,
    fee DECIMAL(10,2),
    create_time timestamp,
    type smallint,
    remark string,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  -- 定义 Kafka 参数
  'connector' = 'kafka',
  'topic' = 'your-topic',
  'scan.startup.mode' = 'latest-offset', 
  'properties.bootstrap.servers' = 'ip:9092',
  'properties.group.id' = 'your-group',
  'format' = 'canal-json', -- tidb 支持该方式
  'canal-json.ignore-parse-errors' = 'false'
);



CREATE TABLE kh_tb_1 (
    id bigint,
    cid bigint,
    gid bigint,
    fee DECIMAL(10,2),
    create_time timestamp,
    type smallint,
    remark string,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxxxxx:3306/db?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
  'table-name' = 'tb_1',
  'username' = 'user',
  'password' = 'pwd',
  'sink.buffer-flush.max-rows' = '5000',
  'sink.buffer-flush.interval' = '2s',
  'sink.max-retries' = '10'
);

insert into kh_tb_1 select * from tb_1 where cid=xxxxxxx;

最后启动作业任务即可。


http://www.kler.cn/a/559253.html

相关文章:

  • 【三十四周】文献阅读:DeepPose: 通过深度神经网络实现人类姿态估计
  • Leetcode 位计算
  • python~http的请求参数中携带map
  • Python基于flask的智慧交通可视化,大数据智慧交通数据可视化系统
  • 前后端项目部署服务器(传统部署和Docker部署)
  • 计算机毕业设计SpringBoot+Vue.js服装商城 服装购物系统(源码+LW文档+PPT+讲解+开题报告)
  • 解决jupyter notebook不是内部或外部命令问题
  • 近地面无人机遥感:如何利用高光谱数据反演植被生理参数?
  • Arm 将自己制造芯片
  • 改BUG:远程连接redis失败,可能是防火墙的问题
  • vue3中Watch和WatchEffect的用法和区别
  • 大语言模型中的 Token如何理解?
  • 基于 go-wrk 在 Windows 环境下对 Go Web 应用进行 HTTP 压力测试
  • 近10年气象分析(深度学习)
  • 演示基于FPGA的视频图像去雾处理效果
  • 蓝桥杯之阶段考核
  • DeepSeek 助力 Vue 开发:打造丝滑的单选按钮(Radio Button)
  • 国产超强开源大语言模型 DeepSeek-R1-70B 一键部署教程
  • 20250223下载并制作RTX2080Ti显卡的显存的测试工具mats
  • Go语言中使用viper绑定结构体和yaml文件信息时,标签的使用