Flink CDC 生产环境常用参数总结
Flink CDC 生产环境常用参数总结
1.参数
1. 基本连接参数
这些参数用于定义如何连接到数据库,是配置的必需项。
参数名称 | 说明 | 示例 |
---|---|---|
connector | 数据库连接器类型,常用 mysql-cdc 。 | 'connector' = 'mysql-cdc' |
hostname | 数据库主机名或 IP 地址。 | 'hostname' = '192.168.1.100' |
port | 数据库端口号,MySQL 通常为 3306 。 | 'port' = '3306' |
username | 用于连接数据库的用户名。 | 'username' = 'root' |
password | 数据库用户的密码。 | 'password' = 'your-password' |
database-name | 需要监控的数据库名称,支持正则表达式。 | 'database-name' = 'production_db' |
table-name | 需要监控的表名称,支持正则表达式。 | 'table-name' = 'user.*' |
2. 数据读取控制参数
这些参数用于控制数据从数据库的读取行为,确保 Flink CDC 的运行效率和行为符合预期。
参数名称 | 说明 | 示例 |
---|---|---|
scan.startup.mode | 定义 Flink 应从哪里开始读取: | 'scan.startup.mode' = 'initial' |
- initial :从表头读取,包括快照 + 增量数据。 | ||
- latest-offset :从最新的增量变更开始读取,跳过快照。 | ||
- specific-offset :从指定 binlog 位置开始读取。 | ||
- timestamp :从指定的时间戳(毫秒)开始读取。 | ||
debezium.snapshot.mode | 定义如何执行初始快照: | 'debezium.snapshot.mode' = 'initial' |
- initial :读取历史快照 + 实时变更数据。 | ||
- never :只读取增量数据(需要确保表的历史快照无关紧要)。 | ||
scan.incremental.snapshot.enabled | 是否启用增量快照模式(适用于大表的分片读取以减少对源库压力)。 | 'scan.incremental.snapshot.enabled' = 'true' |
chunk.size | 增量快照时的分片大小(默认 1024 行)。适合大表场景调整该值,确保分片大小适中。 | 'chunk.size' = '1000' |
3. 数据库配置相关参数
这些参数主要用于适配数据库的配置,比如时区、并行度等。
参数名称 | 说明 | 示例 |
---|---|---|
server-time-zone | 数据库服务器的时区,用于解析时间字段。 | 'server-time-zone' = 'Asia/Shanghai' |
heartbeat.interval | 心跳信号间隔(毫秒)。用于检测 CDC 连接的可用性和活跃状态。 | 'heartbeat.interval' = '5000' |
database.history | 数据库变更历史记录的存储方式,常用 memory 。 | 'debezium.database.history' = 'memory' |
4. 数据格式参数
用于定义输出数据的格式,生产环境中常用 JSON 格式(debezium-json
或 canal-json
),方便和下游系统对接。
参数名称 | 说明 | 示例 |
---|---|---|
format | 定义数据的序列化格式,常用 debezium-json 或 canal-json 。 | 'format' = 'debezium-json' |
debezium.include.schema.changes | 是否包含 DDL 信息。对于需要捕获表结构变更的场景,设置为 true 。 | 'debezium.include.schema.changes' = 'false' |
5. 容错与性能优化参数
这些参数用于优化数据处理的性能,减少对生产数据库的影响。
参数名称 | 说明 | 示例 |
---|---|---|
scan.incremental.snapshot.chunk.key-columns | 分片的键列(如主键或唯一索引列)。适用于增量快照优化,确保分片能均匀分布。 | 'scan.incremental.snapshot.chunk.key-columns' = 'id' |
parallelism | 定义并行度,适用于高并发数据流场景。 | 'parallelism' = '4' |
debezium.snapshot.locking.mode | 定义快照时的表锁行为: | 'debezium.snapshot.locking.mode' = 'none' |
- none :无锁读取(推荐,前提是数据库支持一致性读取)。 | ||
- exclusive :对表进行排他锁操作(避免数据变更造成不一致)。 |
6. 示例生产环境配置
以下是一个生产环境中常见的 Flink CDC 表配置示例:
CREATE TABLE mysql_cdc_table (
id INT NOT NULL,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.100',
'port' = '3306',
'username' = 'cdc_user',
'password' = 'your-password',
'database-name' = 'production_db',
'table-name' = 'user',
'scan.startup.mode' = 'initial',
'debezium.snapshot.mode' = 'initial',
'scan.incremental.snapshot.enabled' = 'true',
'chunk.size' = '1000',
'server-time-zone' = 'Asia/Shanghai',
'format' = 'debezium-json',
'heartbeat.interval' = '5000'
);
2.参数调优建议
1. 参数调优
-
小表或中等规模表:
- 启用默认快照模式(
scan.startup.mode=initial
)即可满足需求。
- 启用默认快照模式(
-
大表:
- 启用增量快照(
scan.incremental.snapshot.enabled=true
),并设置适当的chunk.size
和scan.incremental.snapshot.chunk.key-columns
。
- 启用增量快照(
-
表锁策略:
- 尽量使用无锁模式(
debezium.snapshot.locking.mode=none
),避免对生产库的影响。
- 尽量使用无锁模式(
-
并行度:
- 根据表的规模和数据量调整
parallelism
,一般建议为 CPU 核心数的 2 倍。
- 根据表的规模和数据量调整
2. 特殊场景注意事项
-
DDL 捕获:(很鸡肋)
- 如果需要捕获表结构变更(如新增字段),需要启用
debezium.include.schema.changes
。
- 如果需要捕获表结构变更(如新增字段),需要启用
-
分区表支持:
- 对于分区表,确保
database-name
和table-name
使用正则表达式定义范围。
- 对于分区表,确保
-
无主键表:
- 对于无主键的表,数据会被全量更新,容易产生性能瓶颈,建议增加主键或唯一索引。
-
数据延迟监控:
- 配置
heartbeat.interval
,确保实时监控 CDC 连接是否正常,并能快速恢复数据流。
- 配置