flink写入hudi MOR表
第一步:创建flink内存表从kafka读取数据:
DROP TABLE IF EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(RCLNT,RLDNR,RRCTY,RVERS,RYEAR,ROBJNR,COBJNR,SOBJNR,RTCUR,RUNIT,DRCRK,RPMAX) NOT ENFORCED
) with (
'connector'='kafka',
'topic'='GLFUNCT_DEBEZIUM_TRANSFER',
--'scan.startup.mode'='earliest-offset',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1725811200000',
'properties.group.id'='KAFKA_GLFUNCT_CHANGELOG_HUDI7',
'properties.bootstrap.servers'='10.66.28.69:9092,10.66.28.70:9092,10.66.28.61:9092',
'value.format'='debezium-json',
'scan.topic-partition-discovery.interval' = '10000',
'value.debezium-json.ignore-parse-errors' = 'true'
);
第二步:创建MOR类型的hudi表
DROP TABLE IF EXISTS HUDI_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(ID,NAME) NOT ENFORCED
)with (
'connector' = 'hudi',
'path' = 'hdfs://nameservice1/user/hive/warehouse/hudi_ods_sap.db/HUDI_ZHANG',
'table.type' = 'MERGE_ON_READ',
'hive_sync.skip_ro_suffix' = 'true',
'hoodie.datasource.write.recordkey.field' = 'ID,NAME',
'write.operation' = 'upsert',
--'write.precombine.field' = 'ETL_DT',
'write.tasks' = '4',
'index.bootstrap.enabled' = 'true',
'write.insert.drop.duplicates'='true',
'compaction.tasks' = '4',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'time_elapsed',
'compaction.delta_seconds' = '1200',
'changelog.enabled' = 'true',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://pld3cwztmg01:9083',
--'hive_sync.jdbc_url' = 'jdbc:hive2://pld3cwztmg01:10000',
'hive_sync.table' = 'ZHANG',
'hive_sync.db' = 'hudi_ods_sap',
'hive_sync.username' = 'hive',
'hive_sync.password' = 'hive'
);
第三步:把kafka表写入到hudi表即可
insert into HUDI_ZHANG select * from HUDI_KAFKA_DEBEZIUM_ZHANG where RCLNT = '300';
以上就是从kafka读取数据写入到hudi表,且表类型是MOR。