当前位置: 首页 > article >正文

flink系列之:使用flink cdc3从mysql数据库同步数据到doris和starrocks

flink系列之:使用flink cdc3从mysql数据库同步数据到doris和starrocks

  • 一、下载部署flink
  • 二、下载部署flink cdc3
  • 三、下载mysql-connector-java到flink和flink cdc的lib目录
  • 四、flink设置checkpoint支持增量同步数据
  • 五、mysql到doris和starrocks的yaml配置文件
  • 六、启动flink和flink cdc
  • 七、查看flink cdc任务同步日志
  • 八、查看mysql表和starrocks表
  • 九、flink cdc技术生产环境应用

一、下载部署flink

  • 下载flink

解压flink

tar -zxvf flink-1.19.1-bin-scala_2.12.tgz

修改flink配置文件config.yaml

taskmanager:
  bind-host: localhost
  host: localhost
  numberOfTaskSlots: 6
  memory:
    process:
      size: 1728m

parallelism:
  default: 1
rest:
  address: 10.66.77.104
  # network interface, such as 0.0.0.0.
  bind-address: 10.66.77.104
  # port: 8081
  # # Port range for the REST and web server to bind to.
  # bind-port: 8080-8090

设置flink 环境变零

cd /etc/profile.d
cat flink.sh 

#export HADOOP_CLASSPATH=`hadoop classpath`
FLINK_HOME=/data/src/flink/flink-1.19.1
PATH=$PATH:$FLINK_HOME/bin:$FLINK_HOME/sbin

export PATH
export FLINK_HOME

启动flink

./start-cluster.sh

查看jps

jps
760234 StandaloneSessionClusterEntrypoint
390132 Jps
760880 TaskManagerRunner

查看flink web ui,{ip}:{port}
在这里插入图片描述

二、下载部署flink cdc3

  • https://github.com/apache/flink-cdc/releases
    在这里插入图片描述
    解压flink-cdc3
tar -zxvf flink-cdc-3.3.0-bin.tar.gz

下载Pipeline Connectors Jars和Source Connector Jars到lib目录

/data/src/flink/flink-cdc-3.3.0/lib   ls
flink-cdc-dist-3.3.0.jar                              flink-cdc-pipeline-connector-maxcompute-3.3.0.jar  flink-sql-connector-tidb-cdc-3.3.0.jar
flink-cdc-pipeline-connector-doris-3.3.0.jar          flink-cdc-pipeline-connector-mysql-3.3.0.jar       mysql-connector-java-8.0.28.jar
flink-cdc-pipeline-connector-elasticsearch-3.3.0.jar  flink-cdc-pipeline-connector-paimon-3.3.0.jar
flink-cdc-pipeline-connector-kafka-3.3.0.jar          flink-cdc-pipeline-connector-starrocks-3.3.0.jar

三、下载mysql-connector-java到flink和flink cdc的lib目录

https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

在这里插入图片描述

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

四、flink设置checkpoint支持增量同步数据

  • execution.checkpointing.interval: 3000

参数说明

  • execution.checkpointing.interval: 这个参数用于指定 Flink 作业执行检查点的频率。检查点是 Flink 用于实现容错机制的一种机制,通过定期保存作业的状态,可以在发生故障时恢复到最近的一个检查点。
  • 3000: 这个值表示检查点的间隔时间,单位是毫秒(ms)。因此,3000 毫秒等于 3 秒。

五、mysql到doris和starrocks的yaml配置文件

放到任意目录下

mysql-to-doris.yaml

   source:
     type: mysql
     hostname: ip
     port: 3306
     username: *********
     password: ************
     tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfo
     server-id: 5400-5404
     server-time-zone: Asia/Shanghai

   sink:
     type: doris
     fenodes: ip:8030
     username: ***********
     password: *************

   route:
     - source-table: data_entry_test.debeziumOfflineClusterInfo
       sink-table: optics.debeziumOfflineClusterInfo
     - source-table: data_entry_test.debeziumRealtimeClusterInfo
       sink-table: optics.debeziumRealtimeClusterInfo


   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

mysql-to-starrocks.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: ip
 port: 3306
 username: *********
 password: **********
 tables: data_entry_test.debeziumOfflineClusterInfo,data_entry_test.debeziumRealtimeClusterInfo
 server-id: 5400-5404
 server-time-zone: Asia/Shanghai

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://ip:9030
  load-url: ip:8030
  username: ****************
  password: ****************
route:
  - source-table: data_entry_test.debeziumOfflineClusterInfo
    sink-table: dd_test_starrocks.debeziumOfflineClusterInfo
  - source-table: data_entry_test.debeziumRealtimeClusterInfo
    sink-table: dd_test_starrocks.debeziumRealtimeClusterInfo
pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 6

六、启动flink和flink cdc

启动flink

./start-cluster.sh

启动flink cdc

/data/src/flink/flink-cdc-3.3.0/bin/flink-cdc.sh
/data/src/flink/flink-cdc-3.3.0/conf/mysql-to-starrocks.yaml

flink web ui查看任务
在这里插入图片描述

七、查看flink cdc任务同步日志

2025-02-18 13:48:49,973 INFO  com.starrocks.connector.flink.catalog.StarRocksCatalog       [] - Success to create table dd_test_starrocks.dd_test_starrocks, sql: CREATE TABLE IF NOT EXISTS dd_test_starrocks.debeziumOfflineClusterInfo (
id VARCHAR(21) NOT NULL,
servername VARCHAR(6168) NOT NULL,
connectorname VARCHAR(6168) NOT NULL,
databasename VARCHAR(6168) NOT NULL,
url VARCHAR(6168) NOT NULL,
topicname VARCHAR(6168) NOT NULL,
clustername VARCHAR(6168) NOT NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH (id);
2025-02-18 14:04:25,298 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (1/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:04:25,333 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> PrePartition (2/2)#0 (2069f3b2a289abd02012736f795a34b7_cbc357ccb763df2852fee8c4fc7d55f2_1_0) switched from INITIALIZING to RUNNING.
2025-02-18 14:09:35,729 INFO  com.starrocks.data.load.stream.DefaultStreamLoader           [] - Stream load completed, label : flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97, database : dd_test_starrocks, table : debeziumOfflineClusterInfo, body : {
    "Status": "OK",
    "Message": "",
    "Label": "flink-84c2fdac-3341-4b5b-8bf1-3946098c0a97",
    "TxnId": 108875857,
    "LoadBytes": 133959,
    "StreamLoadPlanTimeMs": 0,
    "ReceivedDataTimeMs": 0
}

八、查看mysql表和starrocks表

mysql表

-- data_entry_test.debeziumOfflineClusterInfo definition

CREATE TABLE `debeziumOfflineClusterInfo` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
  `servername` varchar(2056) NOT NULL COMMENT 'connector标识名',
  `connectorname` varchar(2056) NOT NULL COMMENT 'connector名称',
  `databasename` varchar(2056) NOT NULL COMMENT '数据库名',
  `url` varchar(2056) NOT NULL COMMENT '数据库名',
  `topicname` varchar(2056) NOT NULL COMMENT 'topic名称',
  `clustername` varchar(2056) NOT NULL COMMENT '集群名称',
  `database_server_id` varchar(256) NOT NULL COMMENT '集群名称',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=765 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

starrocks表

-- dd_test_starrocks.debeziumOfflineClusterInfo definition

CREATE TABLE `debeziumOfflineClusterInfo` (
  `id` varchar(21) NOT NULL COMMENT "",
  `servername` varchar(6168) NOT NULL COMMENT "",
  `connectorname` varchar(6168) NOT NULL COMMENT "",
  `databasename` varchar(6168) NOT NULL COMMENT "",
  `url` varchar(6168) NOT NULL COMMENT "",
  `topicname` varchar(6168) NOT NULL COMMENT "",
  `clustername` varchar(6168) NOT NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
);

如上所示,成功在starrocks表中创建了表,并完成了历史数据和增量数据的同步

九、flink cdc技术生产环境应用

  • 阿里云基于 Flink CDC 的现代数据栈云上实践

细粒度变更策略控制:

  • 支持新增表、新增列、修改列名、修改列定义、删除列、删除表和清空表等操作
    在这里插入图片描述

当上游数据库新增表时,CDC YAML 能够自动识别并同步这些表的数据,而无需重新配置作业。此功能分为两种情况:

  • 历史数据同步:通过开启 scan.newly-added-table.enabled 选项,并通过 savepoint 重启作业来读取新增表的历史数据。
  • 增量数据同步:只需开启 scan.binlog.newly-added-table.enabled 选项,自动同步新增表的增量数据。

在这里插入图片描述


http://www.kler.cn/a/557162.html

相关文章:

  • docker查看正在运行的容器详细教程
  • java八股文-消息队列
  • AtCoder Beginner Contest (ABC)394(ABCD)
  • MongoDB学习
  • Python爬虫selenium验证-中文识别点选+图片验证码案例
  • vue中的watch 和 computed 的区别
  • 软件架构设计:软件工程
  • Golang通过 并发计算平方 示例演示并发
  • 【Postgresql】Linux 部署 Postgresql 数据库 (图文教程)
  • 直角三角堰计算公式
  • 开发指南103-jpa的find**/get**全解
  • 数据结构:队列queue和栈stack
  • UE5中按钮圆角,设置边框
  • CSDN如何设置付费专栏
  • [ TypeScript ] “undefined extends xxx“ 总是为 true 的 bug
  • 深入解析C++函数指针与指针函数:从原理到实战
  • bind()的概念和使用案例
  • USC安防平台之视频切片
  • 驱动的三个框架
  • 52类110个主流Java组件和框架