Flink-CDC 全面解析
Flink-CDC 全面解析
一、CDC 概述
(一)什么是 CDC
CDC 即 Change Data Capture(变更数据获取),其核心要义在于严密监测并精准捕获数据库内发生的各种变动情况,像数据的插入、更新以及删除操作,还有数据表相关的变动等,都会被它一一察觉。并且它会严格按照这些变动实际发生的先后顺序,毫无遗漏地完整记录下来,随后将这些变更信息写入到消息中间件里,方便其他服务按需进行订阅与消费。形象地说,它就像是数据库的“贴身管家”,时刻留意着数据库的任何“风吹草动”,一旦有变化,立马就能获取到相应信息。
(二)CDC 的种类
CDC 主要划分为基于查询和基于 Binlog 这两种方式,下面来看看它们之间的差别:
- 搭建 mysql 集群:在搭建时可以选择使用阿里巴巴的 mycat 来实现分库分表功能,以此更好地管理和扩展数据库架构。
- Canal:当我们想要知晓数据库中某个表的变动情况时,Canal 就能派上用场了,它的原理是依托 mysql 的 binlog,通过解析 binlog 来获取表的变更信息。例如,要是没有 CDC 的话,若想在大屏幕上实时展示订单的统计数据,那就得利用 Canal 去读取 mysql 里的实时订单数据,然后传递给 kafka,再由 kafka 把相关信息发送给 Flink;而要是有了 CDC,Flink 就能直接检测到 mysql 数据的变化,进而得出各项指标了。值得一提的是,CDC 底层其实内置了一个软件叫 debezium。
(三)Flink-CDC
Flink 社区精心打造了 flink-cdc-connectors 组件,这可是个功能强大的 source 组件,它具备直接从 MySQL、PostgreSQL 等数据库读取全量数据以及增量变更数据的能力,极大地拓展了 Flink 与数据库交互的便捷性。而且 Flink 还有诸如 mysql、kafka、hbase、cdc 等多种连接器,其触发器方面,默认大多是基于时间的,像 eventTime、procssingTime 等,当然也支持自定义触发器,比如在智慧交通项目中就有过相关应用。目前这个组件已经开源,开源地址为 https://github.com/ververica/flink-cdc-connectors ,方便广大开发者使用和贡献代码。
二、Flink CDC 案例实操
(一)DataStream 方式的应用
- 导入依赖:详细的依赖导入可参考 https://blog.csdn.net/mynameisgt/article/details/125826905 这个链接内容。要是启动时报错了,也不用慌,解决方案可查看 https://blog.csdn.net/qq_27721169/article/details/132151345 ,一般来说,就是修改 mysql 的驱动包的版本就行。不过要测试相关代码,还得先开启 mysql 的 binlog 日志,具体操作就是开启 MySQL Binlog 并重启 MySQL。
- 编写代码:代码编写完成后,接着要进行建库、建表操作,随后开启 mysql 的 binlog,具体可参照相应文档来执行。
- 案例测试:
- 打包并上传至 Linux:将写好的代码打包好,然后上传到 Linux 系统中,为后续部署做准备。
- 开启 MySQL Binlog 并重启 MySQL:再次强调这个操作的重要性,同时要查看 binlog 数据最新的大小,观察其前后变化情况,方便后续验证数据变更捕获是否准确。
- 创建一个表:创建一个带有随便几个字段的表,用于测试 Flink-CDC 对数据变更的捕获功能。
- 启动 Flink 集群:让 Flink 集群运行起来,为处理数据变更提供运行环境。
- 启动 HDFS 集群:启动 HDFS 集群,保障数据存储等相关功能的正常运行。
- 启动程序:正式启动编写好的 Flink-CDC 程序,开始检测数据变更情况。
- 在 MySQL 的 cdc_test.z_user_info 表中添加、修改或者删除数据:人为地在指定表中制造数据变更,以此来检验 Flink-CDC 是否能准确捕获到这些变化。
- 给当前的 Flink 程序创建 Savepoint:执行
bin/flink savepoint JobId hdfs://bigdata01:9820/flink/save
命令,创建 Savepoint,方便后续程序重启等操作时恢复状态。 - 关闭程序以后从 Savepoint 重启程序:通过
bin/flink run -s hdfs://bigdata01:9820/flink/save/... -c com.bigdata.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
命令,实现从 Savepoint 重启程序,验证程序的稳定性和数据处理的连贯性。
(二)自定义反序列化器
- 代码实现:在 Flink-CDC 中,有像 Canal、Maxwell 等相关总结内容。而且代码版本和 sql 版本存在一定区别:
- 版本支持:代码版本的 Flink 在 1.12 和 1.13 版本都支持相关操作,然而 sql 版本的 Flink 只有到 1.13 版本才支持。
- 监听范围:代码版本支持一次监听多个数据库以及多个表,功能更为强大;而 sql 版本则只支持单库单表的监听。
- 反序列化器:sql 版本中无需进行自定义反序列化器,相对简洁;但代码版本就需要自定义反序列化器了,当然,也可以选择不定义,根据具体业务场景和需求来决定。
(三)FlinkSQL 方式的应用
- 代码实现:在代码实现过程中,有一些需要特别留意的“坑”:
- jar 包版本问题:在 maven 中,各个 jar 包之间的版本有可能出现不兼容等问题,比如可能会出现
java.lang.NoSuchMethodError:scala.Predef$.refArrayOps
这样的错误,需要仔细排查和调整 jar 包版本。 - 主键问题:在 FlinkSQL 里,如果创建的表没有主键,尤其是在 Flink 1.13 版本之后,会遇到
The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled', default: true (fallback keys: [])' to 'true'
这样的限制,所以创建表的时候一定要记得加上主键。详细的常见 SQL 错误内容可参考 https://help.aliyun.com/zh/flink/support/common-sql-errors?spm=a2c4g.11186623.0.i32#section-9oq-z7x-sq0 。
- jar 包版本问题:在 maven 中,各个 jar 包之间的版本有可能出现不兼容等问题,比如可能会出现
以下是一段示例代码:
package com.bigdata.cdc;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @基本功能:
* @program:FlinkProject
* @author: 闫哥
* @create:2024-06-13 11:01:11
**/
public class CdcSQLTest {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
// 获取tableEnv对象
// 通过env 获取一个table 环境
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//2. 创建表对象
//3. 编写sql语句
//4. 将Table变为stream流
tenv.executeSql("CREATE TABLE user_info2 (\n" +
" id INT NOT NULL primary key,\n" +
" name STRING,\n" +
" age int\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'bigdata01',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'scan.startup.mode' = 'latest-offset', " +
" 'database-name' = 'cdc_test',\n" +
" 'table-name' = 'user_info'\n" +
")");
tenv.executeSql("select * from user_info2").print();
Table table = tenv.sqlQuery("select * from user_info2");
DataStream<Tuple2<Boolean, Row>> retractStream = tenv.toRetractStream(table, Row.class);
retractStream.print();
//5. execute-执行
env.execute();
}
}
在这段代码中,首先是准备 Flink 的运行环境,设置好运行模式以及并行度等基础参数,接着获取 StreamTableEnvironment
对象,用于后续的 SQL 操作。然后创建了名为 user_info2
的表,定义了表结构以及相关的连接配置信息,如连接的数据库、用户名、密码等,通过执行 SQL 查询语句并将结果转换为流的形式进行输出,最后执行整个程序,实现基于 FlinkSQL 方式对 Flink-CDC 的应用实践。