数据湖Paimon入门指南
一、主键表(Primary Key Table)
Merge Engines
sink-upsert-realize可能会导致不正常的现象。当输入乱序时,我们建议您使用序列字段来纠正无序。建议设置为None
set table.exec.sink.upsert-materialize = NONE
Deduplicate(重复数据消除)
如果用户建表时不指定 merge-engine 配置,创建的 PK 表默认的 Merge Engine 是 deduplicate 即只保留最新的记录,其他的同 PK 数据则被丢弃,如果最新的记录是 DELETE 记录,那么相同 PK 的所有数据都将被删除。
'merge-engine' = 'Deduplicate'
Partial Update
partial-update 必须跟 lookup 或者 full-compaction changelog producer结合使用。Partial无法接收DELETE消息,可以将partial-update.ignore-delete配置为忽略delete消息。
如果用户建表时指定'merge-engine' = 'partial-update',那么就会使用部分更新表引擎,可以做到多个 Flink 流任务去更新同一张表,每条流任务只更新一张表的部分列,最终实现一行完整的数据的更新,对于需要拉宽表的业务场景,partial-update 非常适合此场景,而且构建宽表的操作也相对简单。这里所说的多个 Flink 流任务并不是指多个 Flink Job 并发写同一张 Paimon 表,这样需要拆分 Compaction 任务,就不能在每个 Job 的 Writer 端做 Compaction, 需要一个独立的 Compaction 任务,比较麻烦。目前推荐将多条 Flink 流任务 UNION ALL 起来,启动一个 Job 写 Paimon 表。这里需要注意的是,对于流读场景,partial-update 表引擎需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,同时 partial-update 不能接收和处理 DELETE 消息,为了避免接收到 DELETE 消息报错,需要通过配置 'partial-update.ignore-delete' = 'true' 忽略 DELETE 消息。
--创建Partial update结果表
CREATE TABLE if not EXISTS paimon.dw.order_detail
(
`order_id` string
,`product_type` string
,`plat_name` string
,`ref_id` bigint
,`start_city_name` string
,`end_city_name` string
,`create_time` timestamp(3)
,`update_time` timestamp(3)
,`dispatch_time` timestamp(3)
,`decision_time` timestamp(3)
,`finish_time` timestamp(3)
,`order_status` int
,`binlog_time` bigint
,PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
'bucket' = '20', -- 指定20个bucket
'bucket-key' = 'order_id',
'sequence.field' = 'binlog_time', -- 记录排序字段
'changelog-producer' = 'full-compaction', -- 选择 full-compaction ,在compaction后产生完整的changelog
'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间
'merge-engine' = 'partial-update',
'partial-update.ignore-delete' = 'true' -- 忽略DELETE数据,避免运行报错
);
Aggregation
如果用户建表时指定 'merge-engine' = 'aggregation',此时使用聚合表引擎,可以通过聚合函数做一些预聚合,每个除主键以外的列都可以指定一个聚合函数,相同主键的数据就可以按照列字段指定的聚合函数进行相应的预聚合,如果不指定则默认为 last-non-null-value ,空值不会覆盖。Agg 表引擎也需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,需要注意的是除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配置 'fields.${field_name}.ignore-retract'='true' 忽略。
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
change producer
Changelog 主要应用在流读场景。流式查询将不断产生最新的更改。这些更改可以来自底层表文件,也可以来自像Kafka这样的外部日志系统。与外部日志系统相比,表文件的更改成本较低,但延迟较高(取决于创建快照的频率)。
通过在创建表时指定变更日志生产者表属性,用户可以选择从文件中生成的变更模式。
目前数仓分层是在 Paimon 里做的,数据以 Table Format 的形式存储在文件系统上,如果下游的 Flink 任务要流读 Paimon 表数据,需要存储帮助生成 Changelog(成本较低,但延迟相对较高),以便下游流读的,这时就需要我们在建表时指定 Paimon 的 Changelog Producer 决定以何种方式在何时生成 Changelog。如果不指定则不会在写入 Paimon 表的时候生成 Changelog,那么下游任务需要在流读时生成一个物化节点来产生 Changelog。这种方式的成本相对较高,同时官方不建议这样使用,因为下游任务在 State 中存储一份全量的数据,即每条数据以及其变更记录都需要保存在状态中。
Paimon 支持的 Changelog Produer 包括:
none:如果不指定,默认就是 none,成本较高,不建议使用。
总之,'changelog- producer' = 'none' 最适合数据库系统这样的消费者。Flink还有一个内置的“normalize”操作符,它将每个键的值保持在状态中。可以很容易地看出,这种操作成本非常高,应该避免。
input:如果我们的 Source 源是业务库的 Binlog ,即写入 Paimon 表 Writer 任务的输入是完整的 Changelog,此时能够完全依赖输入端的 Changelog, 并且将输入端的 Changelog 保存到 Paimon 的 Changelog 文件,由 Paimon Source 提供给下游流读。通过配置 'changelog-producer' = 'input',将 Changelog Producer 设置为 input 。
通过指定'changelog-producer' = 'input',Paimon编写器将其输入作为完整变更日志的来源。所有输入记录将保存在单独的变更日志文件中,并由Paimon来源提供给消费者。最适合cdc或者flink有状态计算场景。
以下为一个flink cdc如湖的demo
EXECUTE CDCSOURCE cdc_demo WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'username',
'password' = 'password',
'checkpoint' = '30000',
'scan.startup.mode' = 'initial',
'source.server-time-zone' = 'Asia/Tokyo',
'parallelism' = '4',
'database-name' = 'demo',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'fts_hive',
'sink.catalog.type' = 'fts_hive',
'sink.catalog.uri' = 'thrift://localhost:9083',
'sink.bucket' = '4',
'sink.snapshot.time-retained' = '24h',
'table-list' = 'A01,A02,A03,A04,A05',
'sink.changelog-producer' = 'input',
'sink.catalog.warehouse' = 'hdfs://cluster/warehouse/table_store',
'sink.sink.db' = 'fts_ods_db_demo'
);
lookup:如果我们的输入不是完整的 Changelog, 并且不想在下游流读时通过 Normalize 节点生成 Changelog, 通过配置 'changelog-producer' = 'lookup',通过 Lookup 的方式在数据写入的时候生成 Changelog,此 Changelog Produer 目前处于实验状态,暂未经过大量的生产验证。
Paimon将在提交数据写入之前通过'lookup'生成变更日志。
详细:
数据湖paimon入门指南(sink,upsert) - AI牛丝