当前位置: 首页 > article >正文

【Apache Paimon】-- 4 -- Flink 消费 kafka 数据,然后写入 paimon

目录

1、本地开发环境

2、kafka2paimon 实现流程

3、代码实现

3.1、项目名称

3.2、项目结构

3.3、Pom.xml 和 log4j.properties 文件

3.4、代码核心类

3.4.1、入口类:Kafka2PaimonDemo.java

3.4.2、参数解析类

3.4.2.1、JobParameterUtil.java( flink job scheduler )

3.4.2.2、KafkaSourceParametersUtil.java(kafka source 参数)

3.4.2.3、OSSParametersUtils.java (oss 参数)

3.4.2.4、PaimonCatalogParameterUtils.java(paimon catalog 参数)

3.4.2.5、PaimonTableParameterUtils.java (paimon table 参数)

3.4.3、flink table env 类:FlinkTableInitUtils.java

3.4.4、配置类

3.4.4.1、DefaultConnectorVal.java

3.4.4.2、DefaultFlinkConfigVal.java

3.4.4.3、ParameterConfigs.java

3.4.4.4、PropertiesConstants.java

3.4.5、bean类

3.4.5.1、PaimonFileSystemCatalogInfo.java

3.4.5.2、JobSchedulerInfo.java

3.4.5.3、OSSGlobalVar.java

3.4.5.4、OSSInfo.java

3.4.5.5、PaimonPrimaryKeyTableSinkInfo.java

3.4.5.6、KafkaSourceInfo.java

3.4.6、重写 OSSFileSystemFactory.java

3.5、运行核心类的步骤

3.5.1、通过本地 kafka shell 生产数据到 topic:test_paimon中

​编辑

3.5.2、编辑 main class 的 args

3.5.4、运行 Kafka2PaimonDemo.java,本地访问 flink web-ui

4、查询 oss 结果

4.1 paimon 表

4.2 flink checkpoint/savepoint 存储

5、参考


1、本地开发环境

Mac OS 10.15.6
Oracle Jdk 11
Scala 2.12.17
Intellij Idea 2023.1
阿里云 OSS

scala 包和 jdk 包下载:

链接:https://pan.baidu.com/s/1HSkoUmzpbFcTx3aB9wte6w?pwd=81jc 
提取码: 81jc

maven pom 核心依赖包:

<apache.flink.version>1.19.1</apache.flink.version>
<apache.paimon.version>0.9.0</apache.paimon.version>
<flink-kafka.version>3.3.0-1.19</flink-kafka.version>
<aliyun.oss.version>3.17.2</aliyun.oss.version>
<fs.hadoopshaded.version>3.3.0</fs.hadoopshaded.version>
<fastjson.version>1.2.83</fastjson.versi

http://www.kler.cn/a/429411.html

相关文章:

  • Linux如何安装discuz
  • docker安装Emqx并使用自签名证书开启 SSL/TLS 连接
  • 数据库之连接池Druid
  • ZZCMS2023存在跨站脚本漏洞(CNVD-2024-44822、CVE-2024-44818)
  • sock_poll内核函数
  • No module named ‘_ssl‘ No module named ‘_ctypes‘
  • 如何防范顶级应用程序安全威胁
  • 【大语言模型】LangChain ModelsIO与Models I/O Promopts详解
  • 【CKS最新模拟真题】Dockerfile修改
  • CTF-RE/WEB: python-Hook(钩子)
  • 电子信息工程自动化 基于单片机的居室安全报警系统
  • 为什么 JavaScript 中的回调函数未按顺序执行?
  • Pydantic 动态字段:使用和不使用 `@computed_field` 的对比指南
  • 如何使用 JavaScript 获取页面滚动位置?
  • Java项目实战II基于微信小程序的跑腿系统(开发文档+数据库+源码)
  • Hasura 中间件API go操作示例
  • 专为高性能汽车设计的Armv9架构的Neoverse V3AE CPU基础知识与软件编码特性解析
  • 管理系统前端框架开发案例学习
  • redis-stack redisSearch环境安装搭建
  • 记录一下,解决js内存溢出npm ERR! code ELIFECYCLEnpm ERR! errno 134 以及 errno 9009