当前位置: 首页 > article >正文

flink写入hudi MOR表

第一步:创建flink内存表从kafka读取数据:

DROP TABLE IF EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(RCLNT,RLDNR,RRCTY,RVERS,RYEAR,ROBJNR,COBJNR,SOBJNR,RTCUR,RUNIT,DRCRK,RPMAX) NOT ENFORCED
) with (
'connector'='kafka',
'topic'='GLFUNCT_DEBEZIUM_TRANSFER',
--'scan.startup.mode'='earliest-offset',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1725811200000',
'properties.group.id'='KAFKA_GLFUNCT_CHANGELOG_HUDI7',
'properties.bootstrap.servers'='10.66.28.69:9092,10.66.28.70:9092,10.66.28.61:9092',
'value.format'='debezium-json',
'scan.topic-partition-discovery.interval' = '10000',
'value.debezium-json.ignore-parse-errors' = 'true'
);

第二步:创建MOR类型的hudi表

DROP TABLE IF EXISTS HUDI_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(ID,NAME) NOT ENFORCED
)with (
'connector' = 'hudi',
'path' = 'hdfs://nameservice1/user/hive/warehouse/hudi_ods_sap.db/HUDI_ZHANG',
'table.type' = 'MERGE_ON_READ',
'hive_sync.skip_ro_suffix' = 'true',
'hoodie.datasource.write.recordkey.field' = 'ID,NAME',
'write.operation' = 'upsert',
--'write.precombine.field' = 'ETL_DT',
'write.tasks' = '4',
'index.bootstrap.enabled' = 'true',
'write.insert.drop.duplicates'='true',
'compaction.tasks' = '4',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'time_elapsed',
'compaction.delta_seconds' = '1200',
'changelog.enabled' = 'true',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://pld3cwztmg01:9083',
--'hive_sync.jdbc_url' = 'jdbc:hive2://pld3cwztmg01:10000',
'hive_sync.table' = 'ZHANG',
'hive_sync.db' = 'hudi_ods_sap',
'hive_sync.username' = 'hive',
'hive_sync.password' = 'hive'
);

第三步:把kafka表写入到hudi表即可

insert into HUDI_ZHANG select * from HUDI_KAFKA_DEBEZIUM_ZHANG where RCLNT = '300'; 

以上就是从kafka读取数据写入到hudi表,且表类型是MOR。


http://www.kler.cn/a/300746.html

相关文章:

  • ASP.NET Core - 配置系统之配置添加
  • Linux SUID提权
  • vue自适应高度(缩放浏览器)
  • 数仓建模:如何设计可扩展性较好的同环比计算模型?
  • 如何在Mac上使用Brew更新Cursor应用程序
  • excel仅复制可见单元格,仅复制筛选后内容
  • vue 使用jszip,file-saver下载压缩包,自定义文件夹名,文件名打包下载为zip压缩包文件,全局封装公共方法使用。
  • webctf
  • XML 保存 显示XML 方式 encoding=“UTF8“
  • OpenHarmony鸿蒙( Beta5.0)智能窗户通风设备开发详解
  • 笔记:Centos Jdk Nginx 安装包安装命令
  • 【Django】Django REST Framework接口实现详解:从APIView到ModelViewSet
  • 在 PyTorch 中,除了 pad_sequence 还有哪些其他处理序列数据的函数?时间序列数据 预处理
  • [项目][WebServer][TcpServer]详细讲解
  • [计算机网络]-计网学习笔记-计网知识点总结(附完整笔记)
  • C++自学笔记35(文件操作)
  • 抖音视频素材哪里来的?抖音视频素材库在哪里找分享
  • Vue 常用语法
  • 【springboot】简易模块化开发项目整合MyBatis-plus
  • [图解]建模实例-14是哪边的流程有问题
  • 代码随想录算法训练营day32
  • 4WRA6E07-2X/G24N9K4/VL配套HE-SP2比例放大器
  • 53 - I. 在排序数组中查找数字 I
  • Trinamic医疗成功的事例之TMCM611
  • QSoundEffect 用于播放一些单调简单的声音
  • Python画笔案例-045 绘制渐变圆盘