当前位置: 首页 > article >正文

Flink CDC 源码解析--整体流程

背景

本文基于 Flink CDC 3.2.0
最近在做了一个实时数据抽取的工作,也就是比较简单的从mysql到Starrocks的工作,其实这种有很多中实现,比如说

  1. flink SQL
mysql -> starrocks
  1. flink jar
    写代码直接读取mysql的数据,之后写入到Starrocks
    其他的用 Spark Streaming 或者 Structured Streaming这种就不说了,毕竟在实时同步这块,Flink 还是占据了很大的优势

  2. canal + kafka + starrocks
    这里流程还需要中转到kakfa,流程繁琐冗长

但是以上几种方式都是得写SQL或者写代码实现,这种是要一定的专业知识和门槛的,恰好 FLink CDC 提供了一种yaml文件配置化的方式来降低这种门槛,不需要掌握专业知识就能够进行快速的进行数据传输的工作,在此分析一下 Flink CDC的整个流程

分析

Demo

首先先见一下这种快速开发数据传输的Demo:

source:
  type: mysql
  hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
  port: 3306
  username: ${secret_values.mysqlusername}
  password: ${secret_values.mysqlpassword}
  tables: order_dw_mysql.\.*
  server-id: 5405-5415

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
  load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
  username: ${secret_values.starrocksusername}
  password: ${secret_values.starrockspassword}
  table.create.properties.replication_num: 1
  
route:
  - source-table: order_dw_mysql.\.*
    sink-table: order_dw_sr.<>
    replace-symbol: <>
    description: route all tables in source_db to sink_db

pipeline:
  name: Sync MySQL Database to StarRocks
  schema.change.pipeline: EVOLVE
  parallelism: 1

只需要配置 source sink pipline 就可以快速的进行数据的传输工作,在flink后台最终形成的物理执行图为:
在这里插入图片描述

而且如果库表存在,则不会创建库表,否则会自动创建。

源码分析

首先是入口 CliFrontend的 main方法:

        Options cliOptions = CliFrontendOptions.initializeOptions();
        CommandLineParser parser = new DefaultParser();
        CommandLine commandLine = parser.parse(cliOptions, args);

        // Help message
        if (args.length == 0 || commandLine.hasOption(CliFrontendOptions.HELP)) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.setLeftPadding(4);
            formatter.setWidth(80);
            formatter.printHelp(" ", cliOptions);
            return;
        }

        // Create executor and execute the pipeline
        PipelineExecution.ExecutionInfo result = createExecutor(commandLine).run();

        // Print execution result
        printExecutionInfo(result);

  • 首先是参数解析,解析 bash bin/flink-cdc.sh /path/mysql-to-doris.yaml 所带的参数
  • 再者createExecutor方法构建CliExecutor,这里主要配置一些Flink的参数等

重要的是run方法:

            PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
            PipelineDef pipelineDef =
                    pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
            // Create composer
            PipelineComposer composer = getComposer();
            // Compose pipeline
            PipelineExecution execution = composer.compose(pipelineDef);
            // Execute or submit the pipeline
            return execution.execute();

这里主要就是解析 yaml文件,以及根据source sink构建 pipeline并生成 filnk DataStream 任务 ,这种方式和 flink SQL的方式很像,只不过 SQL方式 是 解析SQL形成对应的flink DataStream任务

  • YamlPipelineDefinitionParser.parser
    根据用户的配置,解析 SourceDef/ SinkDef /RouteDefs /TransformDefs/ UdfDef,并组装成PipelineDef
  • FlinkPipelineComposer.compose
    根据上个步骤生成的PipelineDef,解析为Flink DataStream 流,这里会根据用户配置不同,最终生成的物理计划也会不同
         public PipelineExecution compose(PipelineDef pipelineDef) {
          int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
          env.getConfig().setParallelism(parallelism);
    
          SchemaChangeBehavior schemaChangeBehavior =
                  pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
    
          // Build Source Operator
          DataSourceTranslator sourceTranslator = new DataSourceTranslator();
          DataStream<Event> stream =
                  sourceTranslator.translate(
                          pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
    
          // Build PreTransformOperator for processing Schema Event
          TransformTranslator transformTranslator = new TransformTranslator();
          stream =
                  transformTranslator.translatePreTransform(
                          stream, pipelineDef.getTransforms(), pipelineDef.getUdfs());
    
          // Schema operator
          SchemaOperatorTranslator schemaOperatorTranslator =
                  new SchemaOperatorTranslator(
                          schemaChangeBehavior,
                          pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
                          pipelineDef
                                  .getConfig()
                                  .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
          OperatorIDGenerator schemaOperatorIDGenerator =
                  new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
    
          // Build PostTransformOperator for processing Data Event
          stream =
                  transformTranslator.translatePostTransform(
                          stream,
                          pipelineDef.getTransforms(),
                          pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
                          pipelineDef.getUdfs());
    
          // Build DataSink in advance as schema operator requires MetadataApplier
          DataSinkTranslator sinkTranslator = new DataSinkTranslator();
          DataSink dataSink =
                  sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);
    
          stream =
                  schemaOperatorTranslator.translate(
                          stream,
                          parallelism,
                          dataSink.getMetadataApplier()
                                  .setAcceptedSchemaEvolutionTypes(
                                          pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),
                          pipelineDef.getRoute());
    
          // Build Partitioner used to shuffle Event
          PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
          stream =
                  partitioningTranslator.translate(
                          stream,
                          parallelism,
                          parallelism,
                          schemaOperatorIDGenerator.generate(),
                          dataSink.getDataChangeEventHashFunctionProvider(parallelism));
    
          // Build Sink Operator
          sinkTranslator.translate(
                  pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
    
          // Add framework JARs
          addFrameworkJars();
    
          return new FlinkPipelineExecution(
                  env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
      }
    
     
    
    • 根据在 pipline 配置的parallelism 设置整个任务的并行度

    • DataSourceTranslator 根据 SourceDef,生成 DataStreamSource
      这里主要是通过createDataSource 方法,利用JAVA SPI机制找出 DataSourceFactory的实现类,对应我们这里就是MySqlDataSourceFactory
      所以说 这里的source的配置都是可以参考MySqlDataSourceFactory的配置的

    • TransformTranslator 也是类似,读者自行阅读

    • DataSinkTranslator 这里会根据 SinkDef 生成 DataSink, 这个在后续的 PartitioningTranslator 以及 schemaOperatorTranslator 中都有用到
      这里也是利用JAVA SPI机制找出 DataSinkFactory的实现类,对应我们这里就是StarRocksDataSinkFactory,所以说这里的配置也是可以参考StarRocksDataSinkFactory的配置的

      • 在 schemaOperatorTranslator 中主要根据 getIncludedSchemaEvolutionTypes 方法获取可以变更的事件,这会综合 用户在sink中配置的
        exclude.schema.changesinclude.schema.changes
    • SchemaOperatorTranslator 这里会根据 pipline配置的 schema.change.behavior 值(我们这里是 LENIENT)来确定SchemaRegistryProvider的行为
      这里的链路读者自己去缕,最后会调用到 SchemaRegistryRequestHandler.applySchemaChange方法:

      private void applySchemaChange(
             TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
         for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
             if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
                 if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
                     currentIgnoredSchemaChanges.add(changeEvent);
                     continue;
                 }
             }
             if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
                 LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);
                 currentIgnoredSchemaChanges.add(changeEvent);
             } else {
                 try {
                     metadataApplier.applySchemaChange(changeEvent);
                     schemaManager.applyEvolvedSchemaChange(changeEvent);
                     currentFinishedSchemaChanges.add(changeEvent);
                 } catch (Throwable t) {
                     ...
                     if (!shouldIgnoreException(t)) {
                         currentChangeException = t;
                         break;
                     } else {
                       ...
                     }
                 }
             }
             ...
         }
      
    }
    -  如果传递下来的事件不是`create.table`,且`schema.change.behavior` 配置的是 IGNORE,则忽略发送事件
    -  如果下游(也就是sink)不能够接受事件类型,也忽略发送该事件
       对应到我们的sink是`StarRocksMetadataApplier`:
       ```
          public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
              return Sets.newHashSet(
                SchemaChangeEventType.CREATE_TABLE,
                SchemaChangeEventType.ADD_COLUMN,
                SchemaChangeEventType.DROP_COLUMN);
       ```
    - 否则 就会进行应用:
      ```
        if (schemaChangeEvent instanceof CreateTableEvent) {
            applyCreateTable((CreateTableEvent) schemaChangeEvent);
        } else if (schemaChangeEvent instanceof AddColumnEvent) {
            applyAddColumn((AddColumnEvent) schemaChangeEvent);
        } else if (schemaChangeEvent instanceof DropColumnEvent) {
            applyDropColumn((DropColumnEvent) schemaChangeEvent);
        } else if (schemaChangeEvent instanceof RenameColumnEvent) {
            applyRenameColumn((RenameColumnEvent) schemaChangeEvent);
        } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
            applyAlterColumn((AlterColumnTypeEvent) schemaChangeEvent);
        } else {
            throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
        }
    
    
    对于 applyCreateTable 这里会有 如果库表存在就会不创建的判断:
       if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
           catalog.createDatabase(starRocksTable.getDatabaseName(), true);
       }
    
       try {
           catalog.createTable(starRocksTable, true);// 这里的true就是 ignoreIfExists 为 true
    
    • 组装成 FlinkPipelineExecution返回
  • PipelineExecution.execute 运行flink 任务
    这和以DataFrame api写Flink jar任务一样,调用env.executeAsync方法运行.

http://www.kler.cn/a/394146.html

相关文章:

  • 分子动力学中优化算法和积分算法
  • Linux应用软件编程-多任务处理(进程)
  • 怎么把多个PDF合并到一起-免费实用PDF编辑处理工具分享
  • bash shell的条件语句
  • [C/C++]new/delete 和 malloc/free 的区别?
  • 可灵1.6正式上线,图生视频再创新视界
  • 行业类别-智慧城市-子类别智能交通-细分类别自动驾驶技术-应用场景城市公共交通优化
  • 霞智科技Titan 810荣获TÜV南德欧盟CE-MD认证证书
  • C++入门基础知识149—【关于C++ 关系运算符重载】
  • Node-RED - 编辑器添加用户认证
  • 深度学习之其他常见的生成式模型
  • FairyGUI和Unity联动(入门篇)
  • 第四十章 Vue之使用ESLint修正代码规范
  • C/C++语言基础--C++模板与元编程系列六,C++元编程相关库的讲解与使用
  • 七次课掌握 Photoshop:形状和文字
  • HTTP 1.0、HTTP 1.1 和 HTTP 2.0 区别
  • 《物理学报》
  • jmeter常用配置元件介绍总结之线程组
  • MySQL 8.0特性-自增变量的持久化
  • linux系统网络设置之ssh和nfs
  • Ubuntu 22.04.4 LTS + certbot 做自动续签SSL证书(2024-11-14亲测)
  • 【C#设计模式(9)——组合模式(Component Pattern)】
  • STM32设计学生宿舍监测控制系统
  • 基于Affine-Sift算法的图像配准matlab仿真
  • 【卡尔曼滤波】数据融合Fusion的应用 C语言、Python实现(Kalman Filter)
  • Scala 的Set集合