当前位置: 首页 > article >正文

Flink CDC系列之:学习理解核心概念——Data Pipeline

Flink CDC系列之:学习理解核心概念——Data Pipeline

  • 数据管道
  • source
  • sink
  • 管道配置
  • Table ID
  • route
  • transform
  • 案例

数据管道

由于 Flink CDC 中的事件以管道方式从上游流向下游,因此整个 ETL 任务被称为数据管道。

管道对应于 Flink 中的一系列操作。

要描述数据管道,需要以下部分:

  • source
  • sink
  • pipeline

以下部分是可选的:

  • route
  • transform

source

数据源用于访问元数据并从外部系统读取更改的数据。

数据源可以同时从多个表读取数据。

  • Flink CDC系列之:学习理解核心概念——Data Source

sink

数据接收器用于应用架构更改并将更改数据写入外部系统。

数据接收器可以同时写入多个表。

  • Flink CDC系列之:学习理解核心概念——Data Sink

管道配置

支持以下数据管道级别的配置选项:

参数含义可选/必需
name管道的名称,将作为作业名称提交给Flink集群。可选
parallelism管道的全局并行度。默认为 1。可选
local-time-zone本地时区定义当前会话时区id。可选
   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2
     user-defined-function:
       - name: addone
         classpath: com.example.functions.AddOneFunctionClass
       - name: format
         classpath: com.example.functions.FormatFunctionClass

Table ID

在连接外部系统时,需要与外部系统的存储对象建立映射关系,这就是 Table Id 所指的。

为了兼容大多数外部系统,Table Id 用三元组表示:(namespace, schemaName, tableName)。

连接器应该建立 Table Id 与外部系统中存储对象的映射。

下表列出了不同数据系统的 Table Id 中的部分:
在这里插入图片描述

route

Route 指定匹配一串 source-table 到 sink-table 的规则,最典型的场景是分库分表合并,将多个上游 source 表路由到同一张 sink 表。

  • Flink CDC系列之:学习理解核心概念——Route

transform

Transform模块帮助用户根据表中的数据列进行数据列的删除和扩展。
此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。

  • Flink CDC系列之:学习理解核心概念——Transform

案例

我们可以使用以下 yaml 文件来定义一个简洁的数据管道,描述将 MySQL app_db 数据库下的所有表同步到 Doris:

 source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   transform:
     - source-table: adb.web_order01
       projection: \*, UPPER(product_name) as product_name
       filter: id > 10 AND order_id > 100
       description: project fields and filter
     - source-table: adb.web_order02
       projection: \*, UPPER(product_name) as product_name
       filter: id > 20 AND order_id > 200
       description: project fields and filter

   route:
     - source-table: app_db.orders
       sink-table: ods_db.ods_orders
     - source-table: app_db.shipments
       sink-table: ods_db.ods_shipments
     - source-table: app_db.products
       sink-table: ods_db.ods_products

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

我们可以使用以下 yaml 文件来定义一个复杂的数据管道,描述将 MySQL app_db 数据库下的所有表同步到 Doris,并给出特定的目标数据库名称 ods_db 和特定的目标表名称前缀 ods_ :

 source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   transform:
     - source-table: adb.web_order01
       projection: \*, format('%S', product_name) as product_name
       filter: addone(id) > 10 AND order_id > 100
       description: project fields and filter
     - source-table: adb.web_order02
       projection: \*, format('%S', product_name) as product_name
       filter: addone(id) > 20 AND order_id > 200
       description: project fields and filter

   route:
     - source-table: app_db.orders
       sink-table: ods_db.ods_orders
     - source-table: app_db.shipments
       sink-table: ods_db.ods_shipments
     - source-table: app_db.products
       sink-table: ods_db.ods_products

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2
     user-defined-function:
       - name: addone
         classpath: com.example.functions.AddOneFunctionClass
       - name: format
         classpath: com.example.functions.FormatFunctionClass

http://www.kler.cn/news/366430.html

相关文章:

  • python实战(一)——iris鸢尾花数据集分类
  • Python | Leetcode Python题解之第508题出现次数最多的子树元素和
  • [RK3566-Android11] 使用SPI方式点LED灯带-JE2815/WS2812,实现呼吸/渐变/随音量变化等效果
  • django5入门【03】新建一个hello界面
  • 基于vue框架的的高校消防设施管理系统06y99(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。
  • VoLTE 微案例:VoLTE 注册失败,I-CSCF 返回 403,HSS(UAR) 返回 5001
  • MySQL 二进制和中继日志管理
  • STM32L031F6P6开发环境搭建
  • 隨筆 20241023 Kafka 的幂等性与分区顺序性探讨
  • excel斜线表头
  • python爬虫:实例讲解xpatch的基本使用
  • 人工智能在自然语言处理(NLP)中的应用
  • Redis面试题扩展
  • .NET Core WebApi第2讲:前后端分离,Restful
  • unity URP下VolumetricFog插件发布的时候安卓里没有显示问题
  • C++二级2021年9月试卷及答案
  • Python——脚本实现datax全量同步mysql到hive
  • (北京政务服务满意度公司)满意度调查助力服务质量提升
  • 【Java】类来管理个人简历信息
  • UWB物资定位标签
  • C2W4.LAB.Word_Embedding.Part1
  • 智能听诊器:宠物医疗行业的新动力
  • ubuntu修改默认开机模式(图形/终端)
  • lucene数据写入-02倒排数据缓存组织
  • 【c++ arx 选项板2】
  • Python Pandas 数据分析的得力工具:简介