当前位置: 首页 > article >正文

sparksql的Transformation与 Action操作

Transformation操作

与RDD类似的操作

map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、

distinct、dropDuplicates、describe,而以上这些都是企业中比较常用的,这里在一个文件中统一论述

val df1 = spark.read.json("src/main/resources/people.json")
// 使用map去除某些字段
df1.map(row => row.getAs[Long](1)).withColumnRenamed("value","age").show()
//df1.map(row => row.getAs[String]("address")).show()
//df1.map(row => row.getString[String](0)).show()

// randomSplit,按照数组中的权重将数据集划分为不同的比例,可用于机器学习
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count

// 取10行数据生成新的DataSet
val df3 = df1.limit(5).show()

// distinct,去重
val df4 = df1.union(df1)
df4.distinct.count

// 这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
df4.dropDuplicates.show
// 传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
//def dropDuplicates(colNames: Seq[String])
df4.dropDuplicates("name", "age").show
df4.dropDuplicates("name").show

// 返回全部列的统计(count、mean、stddev、min、max)
df4.describe().show

// 返回指定列的统计
df4.describe("age").show
df4.describe("name", "age").show
存储相关

persist、checkpoint、unpersist、cache

备注:Dataset 默认的存储级别是 MEMORY_AND_DISK

val df1 = spark.read.json("src/main/resources/people.json")
import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("src/main/resources/data/checkpoint")
df1.show()
df1.checkpoint()

// 默认的存储级别是MEMORY_AND_DISK
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
println(df1.count())
df1.unpersist(true
select相关

列的多种表示:select、selectExpr、drop、withColumn、withColumnRenamed、cast(内置函数)

import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/people.json")
// 列的多种表示方法。使用'、""、$""、col()、df("")
// 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的地方都有效
df1.select('name, 'age, 'address).show
df1.select("name", "age", "address").show
df1.select($"name", $"age", $"address").show
df1.select(col("name"), col("age"), col("address")).show
df1.select(df1("name"), df1("age"), df1("address")).show

// 下面的写法无效并且会报错
// df1.select("name", "age"+10, "address").show
// df1.select("name", "age+10", "address").show

// 这样写才符合语法
df1.select($"name", $"age"+10, $"address").show
df1.select('name, 'age+10, 'address).show

// 可使用expr表达式(expr里面只能使用引号)
df1.select(expr("name"), expr("age+100"), expr("address")).show
df1.selectExpr("name as ename").show
df1.selectExpr("power(age, 2)", "address").show
df1.selectExpr("round(age, -3) as newAge", "name", "address").show

// drop、withColumn、 withColumnRenamed、casting
// drop 删除一个或多个列,得到新的DF
df1.drop("name")
df1.drop("name", "age")

// withColumn,修改列值
val df2 = df1.withColumn("age", $"age"+10)
df2.show

// withColumnRenamed,更改列名
df1.withColumnRenamed("name", "ename")
// 备注:drop、withColumn、withColumnRenamed返回的是DF

// 类型转化的两种方式
df1.selectExpr("cast(age as string)").printSchema
import org.apache.spark.sql.types._
df1.select('age.cast(StringType)).printSchema
 where 相关的
val df1 = spark.read.json("src/main/resources/people.json")
// 过滤操作
df1.filter("age>30").show
df1.filter("age>30 and name=='Tom'").show
// 底层调用的就是filter算子
df1.where("age>30").show
df1.where("age>30 and name=='Tom'").show
groupBy相关的

groupBy、agg、max、min、avg、sum、count(后面5个为内置函数)

import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/people.json")

// 内置的sum max min avg count
df1.groupBy("address").sum("age").show
df1.groupBy("address").max("age").show
df1.groupBy("address").min("age").show
df1.groupBy("address").avg("age").show
df1.groupBy("address").count.show

// 类似having子句
df1.groupBy("address").avg("age").where("avg(age) > 20").show
df1.groupBy("address").avg("age").where($"avg(age)" > 20).show

// agg
df1.groupBy("address").agg("age"->"max", "age"->"min", "age"->"avg", "age"->"sum", "age"->"count").show

// 这种方式更好理解
df1.groupBy("address").agg(max("age"), min("age"), avg("age"),
  sum("age"), count("age")).show

// 给列取别名
df1.groupBy("address").agg(max("age"), min("age"), avg("age"),
  sum("age"), count("age")).withColumnRenamed("min(age)",
  "minAge").show

// 给列取别名,最简便
df1.groupBy("address").agg(max("age").as("maxAge"),
  min("age").as("minAge"), avg("age").as("avgAge"),
  sum("age").as("sumAge"), count("age").as("countAge")).show
 orderBy相关的
import spark.implicits.
val df1 = spark.read.json("src/main/resources/people.json")

// sort,以下语句等价
df1.sort("age").show
df1.sort($"age").show
df1.sort($"age".asc).show
df1.sort($"age".desc).show
df1.sort(-$"age").show
df1.sort(-'age, -'name).show

// orderBy,底层调用的还是sort
df1.orderBy("age").show
join相关的

目前 Apache Spark 3.x 版本中,一共支持以下七种 Join 类型:

INNER JOIN

CROSS JOIN

LEFT OUTER JOIN

RIGHT OUTER JOIN

FULL OUTER JOIN

LEFT SEMI JOIN

LEFT ANTI JOIN

在实现上,这七种 Join 对应的实现类分别如下:

object JoinType {
  def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
    case "inner" => Inner
    case "outer" | "full" | "fullouter" => FullOuter
    case "leftouter" | "left" => LeftOuter
    case "rightouter" | "right" => RightOuter
    case "leftsemi" | "semi" => LeftSemi
    case "leftanti" | "anti" => LeftAnti
    case "cross" => Cross
    case _ =>
      val supported = Seq(
        "inner",
        "outer", "full", "fullouter", "full_outer",
        "leftouter", "left", "left_outer",
        "rightouter", "right", "right_outer",
        "leftsemi", "left_semi", "semi",
        "leftanti", "left_anti", "anti",
        "cross")

      throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
        "Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
  }
}

准备数据

    // 准备数据
    val order = spark.sparkContext.parallelize(
      Seq((1, 101, 2500), (2, 102, 1110), (3, 103, 500), (4, 102, 400))
    ).toDF("paymentId", "customerId", "amount")
    val customer = spark.sparkContext.parallelize(
      Seq((101, "ds"), (102, "ds_hadoop"), (103, "ds001"), (104, "ds002"), (105, "ds003"), (106, "ds004"))
    ).toDF("customerId", "name")

order 表

customer表

 INNER JOIN

在 Spark 中,如果没有指定任何 Join 类型,那么默认就是 INNER JOIN。INNER JOIN 只会返回满足 Join 条件( join condition)的数据,这个在企业中用的应该比较多,具体如下: 

    // inner join
    // 单字段关联
    customer.join(order,"customerId").show
    // 多字段关联  Seq(“customerId”, “name”)
    customer.join(order,Seq("customerId")).show

执行结果

CROSS JOIN

这种类型的 Join 也称为笛卡儿积(Cartesian Product),Join 左表的每行数据都会跟右表的每行数据进行 Join,产生的结果行数为 m*n,所以在生产环境下尽量不要用这种 Join。下面是 CROSS JOIN 的使用例子:

    // cross join
    // 笛卡尔积
    customer.crossJoin(order).show()

    // 如果两张表出现相同的字段,可以使用下面的方式进去筛选  类似customer.name  order.amount
    customer.crossJoin(order)
      .select(customer("name"), order("amount") )
      .show

执行1结果

执行2结果,只显示select的字段

 LEFT OUTER JOIN

LEFT OUTER JOIN 等价于 LEFT JOIN,这个 Join 的返回的结果相信大家都知道,我就不介绍了。下面三种写法都是等价的

    // 俩个表关联字段名一致
    customer.join(order, Seq("customerId"), "left_outer").show
    customer.join(order, Seq("customerId"), "leftouter").show
    customer.join(order, Seq("customerId"), "left").show


    val order2 = spark.sparkContext.parallelize(
      Seq((1, 101, 2500), (2, 102, 1110), (3, 103, 500), (4, 102, 400))
    ).toDF("paymentId", "custId", "amount")

    // 如果两张表使用不同的字段进行关联的话,要使用三等号即===
    customer.join(order2, $"customerId"===$"custId", "left").show

执行结果

RIGHT OUTER JOIN

和 LEFT OUTER JOIN 类似,RIGHT OUTER JOIN 等价于 RIGHT JOIN,下面三种写法也是等价的:

    order.join(customer, Seq("customerId"), "right").show
    order.join(customer, Seq("customerId"), "right_outer").show
    order.join(customer, Seq("customerId"), "rightouter").show

FULL OUTER JOIN

FULL OUTER JOIN 的含义大家应该也都熟悉,会将左右表的数据全部显示出来。FULL OUTER JOIN 有以下四种写法:

    order.join(customer, Seq("customerId"), "outer").show
    order.join(customer, Seq("customerId"), "full").show
    order.join(customer, Seq("customerId"), "full_outer").show
    order.join(customer, Seq("customerId"), "fullouter").show
LEFT SEMI JOIN

LEFT SEMI JOIN 只会返回匹配右表的数据,而且 LEFT SEMI JOIN 只会返回左表的数据,右表的数据是不会显示的,下面三种写法都是等价的

order.join(customer, Seq("customerId"), "leftsemi").show
order.join(customer, Seq("customerId"), "left_semi").show
order.join(customer, Seq("customerId"), "semi").show

LEFT SEMI JOIN 其实可以用 IN/EXISTS 来改写

select * from order where customerId in (select customerId from customer)

LEFT ANTI JOIN

与 LEFT SEMI JOIN 相反,LEFT ANTI JOIN 只会返回没有匹配到右表的左表数据。而且下面三种写法也是等效的

order.join(customer, Seq("customerId"), "leftanti").show
order.join(customer, Seq("customerId"), "left_anti").show
order.join(customer, Seq("customerId"), "anti").show

LEFT SEMI JOIN 其实可以用 NOT IN/EXISTS 来改写

select * from order where customerId not in (select customerId from customer)

 集合相关的

union、unionAll、intersect、except

main{

    val lst = List(StudentAge(1, "Alice", 18),
      StudentAge(2, "Andy", 19),
      StudentAge(3, "Bob", 17),
      StudentAge(4, "Justin", 21),
      StudentAge(5, "Cindy", 20)
    )
    val ds1 = spark.createDataset(lst)
    ds1.show()

    val rdd = spark.sparkContext.makeRDD(List(StudentHeight("Alice", 160),
      StudentHeight("Andy", 159),
      StudentHeight("Bob", 170),
      StudentHeight("Cindy", 165),
      StudentHeight("Rose", 160))
    )
    val ds2 = rdd.toDS
    // union、unionAll、intersect、except。集合的交、并、差
    val ds3 = ds1.select("name")
    val ds4 = ds2.select("sname")

    // union 求并集,不去重  去重使用distinct
    ds3.union(ds4).show
    // 底层依旧调用的是union

    ds3.unionAll(ds4).show

    // intersect 求交
    ds3.intersect(ds4).show

    // except 求差
    ds3.except(ds4).show


}


  // 定义第一个数据集
  case class StudentAge(sno: Int, name: String, age: Int)

  // 定义第二个数据集
  case class StudentHeight(sname: String, height: Int)

交集

差集

空值处理

na.fill、na.drop、na.replace、na.filter

    import spark.implicits._
    import org.apache.spark.sql.functions._
    val df1 = spark.read.json("src/main/resources/data/people.json")

    // NA表示缺失值,即“Missing value”,是“not available”的缩写
    // 删出含有空值的行
    df1.na.drop.show
    // 删除某列的空值和null
    df1.na.drop(Array("age")).show
    // 对全部列填充
    df1.na.fill("NULL").show
    // 对指定单列填充;对指定多列填充
    df1.na.fill("NULL", Array("address")).show
    df1.na.fill(Map("age" -> 0, "address" -> "NULL")).show

    // 对指定的值进行替换
    df1.na.replace(Array("address"), Map("NULL" -> "Shanghai"))
      .na.replace(Array("age"), Map(0 -> 100))
      .show


    // 查询空值列或非空值列。isNull、isNotNull为内置函数
    df1.where("address is null").show
    df1.where($"address".isNull).show
    df1.where(col("address").isNull).show
    df1.filter("address is not null").show
    df1.filter(col("address").isNotNull).show

 Action操作

与RDD类似的操作

show、 collect、 collectAsList、 head、 first、 count、 take、 takeAsList、 reduce

    // 隐式转换
    import spark.implicits._
    // show:显示结果,默认显示20行,截取(true)
    spark.read.json("src/main/resources/data/people.json").show(100, false)

    val df = spark.read.json("src/main/resources/data/people.json")
    println(df.count())

    // 输出数组arr   9
    df.collect().foreach(println)

    // 输出list
    df.collectAsList().forEach(println)

    // 输出head3条 输出数组1
    df.head(3).foreach(println)
    println(df.head(3))

    // 输出第一条 head(1)
    println(df.first())

    // 底层调用的就是head  输出数组3
    df.take(3).foreach(println)
    // 底层调用take,再调用head
    df.takeAsList(3).forEach(println)
 获取结构属性的操作

printSchema、explain、columns、dtypes、col

    val df1 = spark.read.json("src/main/resources/data/people.json")
    // 结构属性
    df1.columns.foreach(println) // 查看列名  address,age,name
    df1.dtypes.foreach(println) // 查看列名和类型  (address,StringType) (age,LongType) (name,StringType)
    df1.explain() // 参看执行计划
    println(df1.col("name")) // 获取某个列
    df1.printSchema // 常用


http://www.kler.cn/a/591840.html

相关文章:

  • Redis 小记
  • LeetCode[42] 接雨水
  • HarmonyOS开发,A持有B,B引用A的场景会不会导致内存泄漏,代码示例告诉你答案
  • Ext系列文件系统
  • 全网首创/纯Qt/C++实现国标GB28181服务/实时视频/云台控制/预置位/录像回放和下载/事件订阅/语音对讲
  • 飞腾2000+/64核加固服务器
  • ruoyi-vue部署
  • 虚幻基础:组件组件通信
  • PreparedStatement:Java 数据库操作的安全与高效之道
  • STM32---FreeRTOS任务通知
  • SpringBoot实现发邮件功能+邮件内容带模版
  • 深入浅出DBSCAN:基于密度的聚类算法
  • 华为营销流程落地方案:MTC=MTL+LTC
  • 删除排序链表中的重复元素(js实现,LeetCode:83)
  • C++ —— 时间操作 chrono 库
  • DeepLearning:卷积神经网络基础补充
  • python实现接口自动化
  • Paper Reading: AnomalyGPT:利用大型视觉-语言模型检测工业异常 (AAAI 2024 Oral)
  • 20. Excel 自动化:Excel 对象模型
  • Springboot中的@ConditionalOnBean注解:使用指南与最佳实践