当前位置: 首页 > article >正文

Fink CDC数据同步(四)Mysql数据同步到Kafka

依赖项

将下列依赖包放在flink/lib

flink-sql-connector-kafka-1.16.2

创建映射表

创建MySQL映射表

CREATE TABLE if not exists mysql_user (
     id     int,
     name   STRING,
     birth  STRING,
     gender    STRING,
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector'= 'mysql-cdc',
    'hostname'= '192.168.0.1',
    'port'= '3306',
    'username'= 'user',
    'password'='password',
    'server-time-zone'= 'Asia/Shanghai',
    'debezium.snapshot.mode'='initial',
    'database-name'= 'bigdata',
    'table-name'= 'user'
); 

select * from mysql_user;

创建upsert-kafka 表

CREATE TABLE kafka_user_upsert(
     id     int,
     name   string,
     birth  string,
     gender    string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'flink-cdc-user',
 'properties.bootstrap.servers' = '192.168.0.4:6668',
 'properties.group.id' = 'flink-cdc-kafka-group',
 'key.format' = 'json',
 'value.format' = 'json'
);

这里指定的Kafka topic会自动创建,也可以预先自行创建

生成作业

insert into kafka_user_upsert select * from mysql_user;

select * from kafka_user_upsert;


 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502


http://www.kler.cn/a/231597.html

相关文章:

  • 【学术会议征稿-第二届生成式人工智能与信息安全学术会议(GAIIS 2025)】人工智能与信息安全的魅力
  • 29. C语言 可变参数详解
  • 【C++】特殊类设计
  • PostgreSQL 约束
  • Deepseek技术浅析(一)
  • wordpress代码结构解析
  • 数据同步工具对比——SeaTunnel 、DataX、Sqoop、Flume、Flink CDC
  • 电力负荷预测 | 电力系统负荷预测模型(Python线性回归、随机森林、支持向量机、BP神经网络、GRU、LSTM)
  • golang 引入swagger(iris、gin)
  • Tkinter教程21:Listbox列表框+OptionMenu选项菜单+Combobox下拉列表框控件的使用+绑定事件
  • libevent源码解析--event,event_callback,event_base
  • 前端bug手册
  • 【npm】修改npm全局安装包的位置路径
  • 极智芯 | 解读国产CPU系列汇总
  • 【c++】STL详解(一):string类的使用
  • 子类将基类的虚函数替换为其自己的虚函数,共用的一个虚函数表,怎么不影响基类
  • 【python】绘制春节烟花
  • 微信小程序(三十九)表单信息收集
  • Java项目使用jasypt加密和解密配置文件中关键信息
  • Pycharm中以chrome打开HTML文件报错: Windows找不到文件‘Chrome‘
  • 使用SM4国密加密算法对Spring Boot项目数据库连接信息以及yaml文件配置属性进行加密配置(读取时自动解密)
  • navigator.mediaDevices.getUserMedia获取本地音频/麦克权限并提示用户
  • 本地部署TeamCity打包发布GitLab管理的.NET Framework 4.5.2的web项目
  • 【Kubernetes】kubectl top pod 异常?
  • Lua迭代器以及各种源函数的实现
  • 从零开始手写mmo游戏从框架到爆炸(十一)— 注册与登录