Flink CDC 源码解析--整体流程
背景
本文基于 Flink CDC 3.2.0
最近在做了一个实时数据抽取的工作,也就是比较简单的从mysql到Starrocks的工作,其实这种有很多中实现,比如说
- flink SQL
mysql -> starrocks
-
flink jar
写代码直接读取mysql的数据,之后写入到Starrocks
其他的用Spark Streaming
或者Structured Streaming
这种就不说了,毕竟在实时同步这块,Flink
还是占据了很大的优势 -
canal + kafka + starrocks
这里流程还需要中转到kakfa,流程繁琐冗长
但是以上几种方式都是得写SQL或者写代码实现,这种是要一定的专业知识和门槛的,恰好 FLink CDC 提供了一种yaml文件配置化的方式来降低这种门槛,不需要掌握专业知识就能够进行快速的进行数据传输的工作,在此分析一下 Flink CDC的整个流程
分析
Demo
首先先见一下这种快速开发数据传输的Demo:
source:
type: mysql
hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: order_dw_mysql.\.*
server-id: 5405-5415
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
username: ${secret_values.starrocksusername}
password: ${secret_values.starrockspassword}
table.create.properties.replication_num: 1
route:
- source-table: order_dw_mysql.\.*
sink-table: order_dw_sr.<>
replace-symbol: <>
description: route all tables in source_db to sink_db
pipeline:
name: Sync MySQL Database to StarRocks
schema.change.pipeline: EVOLVE
parallelism: 1
只需要配置 source sink pipline 就可以快速的进行数据的传输工作,在flink后台最终形成的物理执行图为:
而且如果库表存在,则不会创建库表,否则会自动创建。
源码分析
首先是入口 CliFrontend
的 main方法:
Options cliOptions = CliFrontendOptions.initializeOptions();
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(cliOptions, args);
// Help message
if (args.length == 0 || commandLine.hasOption(CliFrontendOptions.HELP)) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(4);
formatter.setWidth(80);
formatter.printHelp(" ", cliOptions);
return;
}
// Create executor and execute the pipeline
PipelineExecution.ExecutionInfo result = createExecutor(commandLine).run();
// Print execution result
printExecutionInfo(result);
- 首先是参数解析,解析
bash bin/flink-cdc.sh /path/mysql-to-doris.yaml
所带的参数 - 再者createExecutor方法构建
CliExecutor
,这里主要配置一些Flink的参数等
重要的是run
方法:
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
这里主要就是解析 yaml文件,以及根据source sink构建 pipeline并生成 filnk DataStream 任务 ,这种方式和 flink SQL的方式很像,只不过 SQL方式 是 解析SQL形成对应的flink DataStream任务
- YamlPipelineDefinitionParser.parser
根据用户的配置,解析 SourceDef/ SinkDef /RouteDefs /TransformDefs/ UdfDef,并组装成PipelineDef
- FlinkPipelineComposer.compose
根据上个步骤生成的PipelineDef
,解析为Flink DataStream 流,这里会根据用户配置不同,最终生成的物理计划也会不同public PipelineExecution compose(PipelineDef pipelineDef) { int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); env.getConfig().setParallelism(parallelism); SchemaChangeBehavior schemaChangeBehavior = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR); // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream<Event> stream = sourceTranslator.translate( pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); stream = transformTranslator.translatePreTransform( stream, pipelineDef.getTransforms(), pipelineDef.getUdfs()); // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator( schemaChangeBehavior, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), pipelineDef .getConfig() .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT)); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); // Build PostTransformOperator for processing Data Event stream = transformTranslator.translatePostTransform( stream, pipelineDef.getTransforms(), pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), pipelineDef.getUdfs()); // Build DataSink in advance as schema operator requires MetadataApplier DataSinkTranslator sinkTranslator = new DataSinkTranslator(); DataSink dataSink = sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env); stream = schemaOperatorTranslator.translate( stream, parallelism, dataSink.getMetadataApplier() .setAcceptedSchemaEvolutionTypes( pipelineDef.getSink().getIncludedSchemaEvolutionTypes()), pipelineDef.getRoute()); // Build Partitioner used to shuffle Event PartitioningTranslator partitioningTranslator = new PartitioningTranslator(); stream = partitioningTranslator.translate( stream, parallelism, parallelism, schemaOperatorIDGenerator.generate(), dataSink.getDataChangeEventHashFunctionProvider(parallelism)); // Build Sink Operator sinkTranslator.translate( pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate()); // Add framework JARs addFrameworkJars(); return new FlinkPipelineExecution( env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking); }
-
根据在 pipline 配置的parallelism 设置整个任务的并行度
-
DataSourceTranslator 根据 SourceDef,生成
DataStreamSource
这里主要是通过createDataSource 方法,利用JAVA SPI
机制找出DataSourceFactory
的实现类,对应我们这里就是MySqlDataSourceFactory
所以说 这里的source的配置都是可以参考MySqlDataSourceFactory
的配置的 -
TransformTranslator 也是类似,读者自行阅读
-
DataSinkTranslator 这里会根据 SinkDef 生成 DataSink, 这个在后续的 PartitioningTranslator 以及 schemaOperatorTranslator 中都有用到
这里也是利用JAVA SPI
机制找出DataSinkFactory
的实现类,对应我们这里就是StarRocksDataSinkFactory
,所以说这里的配置也是可以参考StarRocksDataSinkFactory
的配置的- 在 schemaOperatorTranslator 中主要根据 getIncludedSchemaEvolutionTypes 方法获取可以变更的事件,这会综合 用户在sink中配置的
exclude.schema.changes
和include.schema.changes
- 在 schemaOperatorTranslator 中主要根据 getIncludedSchemaEvolutionTypes 方法获取可以变更的事件,这会综合 用户在sink中配置的
-
SchemaOperatorTranslator 这里会根据 pipline配置的
schema.change.behavior
值(我们这里是LENIENT
)来确定SchemaRegistryProvider
的行为
这里的链路读者自己去缕,最后会调用到SchemaRegistryRequestHandler.applySchemaChange
方法:private void applySchemaChange( TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) { for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { currentIgnoredSchemaChanges.add(changeEvent); continue; } } if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) { LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId); currentIgnoredSchemaChanges.add(changeEvent); } else { try { metadataApplier.applySchemaChange(changeEvent); schemaManager.applyEvolvedSchemaChange(changeEvent); currentFinishedSchemaChanges.add(changeEvent); } catch (Throwable t) { ... if (!shouldIgnoreException(t)) { currentChangeException = t; break; } else { ... } } } ... }
对于 applyCreateTable 这里会有 如果库表存在就会不创建的判断:- 如果传递下来的事件不是`create.table`,且`schema.change.behavior` 配置的是 IGNORE,则忽略发送事件 - 如果下游(也就是sink)不能够接受事件类型,也忽略发送该事件 对应到我们的sink是`StarRocksMetadataApplier`: ``` public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() { return Sets.newHashSet( SchemaChangeEventType.CREATE_TABLE, SchemaChangeEventType.ADD_COLUMN, SchemaChangeEventType.DROP_COLUMN); ``` - 否则 就会进行应用: ``` if (schemaChangeEvent instanceof CreateTableEvent) { applyCreateTable((CreateTableEvent) schemaChangeEvent); } else if (schemaChangeEvent instanceof AddColumnEvent) { applyAddColumn((AddColumnEvent) schemaChangeEvent); } else if (schemaChangeEvent instanceof DropColumnEvent) { applyDropColumn((DropColumnEvent) schemaChangeEvent); } else if (schemaChangeEvent instanceof RenameColumnEvent) { applyRenameColumn((RenameColumnEvent) schemaChangeEvent); } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) { applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent); } else { throw new UnsupportedSchemaChangeEventException(schemaChangeEvent); }
if (!catalog.databaseExists(starRocksTable.getDatabaseName())) { catalog.createDatabase(starRocksTable.getDatabaseName(), true); } try { catalog.createTable(starRocksTable, true);// 这里的true就是 ignoreIfExists 为 true
- 组装成 FlinkPipelineExecution返回
-
- PipelineExecution.execute 运行flink 任务
这和以DataFrame api写Flink jar任务一样,调用env.executeAsync
方法运行.