当前位置: 首页 > article >正文

CDPHudi实战-集成spark

[一]使用Spark-shell

1-配置hudi Jar包

[root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar   cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 418.2MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 304.8MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 365.0MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 406.1MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 472.7MB/s   00:00
hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 447.1MB/s   00:00
[root@cdp73-1 ~]#

2-进入Spark-shell

spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

3-初始化项目

// spark-shell
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._

val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"

4-创建表

首次提交将自动初始化表,如果指定的基本路径中尚不存在该表。

5-出入数据

// spark-shell
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.embed.timeline.server", "false").
  option("hoodie.table.name", tableName).
  mode(Overwrite).
  save(basePath)

【映射到Hudi写操作】​​​​​​​Hudi提供了多种写操作——包括批量和增量写操作——以将数据写入Hudi表,这些操作具有不同的语义和性能。当未配置记录键(请参见下面的键)时,将选择bulk_insert作为写操作,这与Spark的Parquet数据源的非默认行为相匹配。

6-查询数据

// spark-shell
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")

spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()

7-更新数据

// Lets read data from target Hudi table, modify fare column for rider-D and update it. 
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)

updatesDf.write.format("hudi").
  option("hoodie.datasource.write.operation", "upsert").
  option("hoodie.embed.timeline.server", "false").
  option("hoodie.datasource.write.partitionpath.field", "city").
  option("hoodie.table.name", tableName).
  mode(Append).
  save(basePath)

8-合并数据

// spark-shell
val adjustedFareDF = spark.read.format("hudi").
  load(basePath).limit(2).
  withColumn("fare", col("fare") * 10)
adjustedFareDF.write.format("hudi").
option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
mode(Append).
save(basePath)
// Notice Fare column has been updated but all other columns remain intact.
spark.read.format("hudi").load(basePath).show()

报错

Caused by: org.apache.hudi.exception.HoodieException: Unable to load class
	at org.apache.hudi.common.util.ReflectionUtils.lambda$getClass$0(ReflectionUtils.java:55)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:51)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73)
	at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:165)
	... 28 more
Caused by: java.lang.ClassNotFoundException: com.payloads.CustomMergeIntoConnector
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at org.apache.hudi.common.util.ReflectionUtils.lambda$getClass$0(ReflectionUtils.java:53)


http://www.kler.cn/a/465749.html

相关文章:

  • ARM CCA机密计算安全模型之加密建议
  • 读“2024 A16Z AI 应用精选清单”有感——2025AI执行力之年
  • 让私域用户付费的三个关键要素
  • WPS表格技巧01-项目管理中的基本功能-计划和每日记录的对应
  • 《Swift 结构体》
  • AngularJS 指令:深入解析与高级应用
  • 【Cocos】热更Bug回顾
  • AWS re:Invent 的创新技术
  • day29-三剑客sed
  • 在Ubuntu 18.04.6 LTS安装OpenFace流程
  • 【每日学点鸿蒙知识】查看触摸热区范围、直接赋值到剪贴板、组件截图、横竖屏切换、防截图等
  • oceanbase 集群启动操作
  • 【2025最新计算机毕业设计】基于SSM的物流管理系统(高质量源码,提供文档,免费部署到本地)【提供源码+答辩PPT+文档+项目部署】
  • termux配置nginx+php
  • C++函数模板的定义为何要和调用点放在一起
  • 【HAProxy】如何在Ubuntu下配置HAProxy服务器
  • 网站访问接口顺序执行,防止频繁请求接口而报错,处理切换功能时,必须先请求完数据才可执行下一个功能接口(2025-1-3)
  • GPU加速计算的专业云服务平台:蓝耘GPU算力平台的概述、具体应用与教学
  • Swift Combine 学习(六):自定义 Publisher 和 Subscriber
  • 基于STM32F103的USART的原理及应用(一)(实现手机BLE和MCU进行通信)
  • 探索Wiki:开源知识管理平台及其私有化部署
  • 手机租赁平台开发实用指南与市场趋势分析
  • 探索 Android Instant Apps:InstantAppInfo 的深入解析与架构设计
  • C++中关于异常的简单分析
  • C# 设计模式概况
  • Python爬虫入门(1)