sparksql的Transformation与 Action操作
Transformation操作
与RDD类似的操作
map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、
distinct、dropDuplicates、describe,而以上这些都是企业中比较常用的,这里在一个文件中统一论述
val df1 = spark.read.json("src/main/resources/people.json")
// 使用map去除某些字段
df1.map(row => row.getAs[Long](1)).withColumnRenamed("value","age").show()
//df1.map(row => row.getAs[String]("address")).show()
//df1.map(row => row.getString[String](0)).show()
// randomSplit,按照数组中的权重将数据集划分为不同的比例,可用于机器学习
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count
// 取10行数据生成新的DataSet
val df3 = df1.limit(5).show()
// distinct,去重
val df4 = df1.union(df1)
df4.distinct.count
// 这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
df4.dropDuplicates.show
// 传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
//def dropDuplicates(colNames: Seq[String])
df4.dropDuplicates("name", "age").show
df4.dropDuplicates("name").show
// 返回全部列的统计(count、mean、stddev、min、max)
df4.describe().show
// 返回指定列的统计
df4.describe("age").show
df4.describe("name", "age").show
存储相关
persist、checkpoint、unpersist、cache
备注:Dataset 默认的存储级别是 MEMORY_AND_DISK
val df1 = spark.read.json("src/main/resources/people.json")
import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("src/main/resources/data/checkpoint")
df1.show()
df1.checkpoint()
// 默认的存储级别是MEMORY_AND_DISK
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
println(df1.count())
df1.unpersist(true
select相关
列的多种表示:select、selectExpr、drop、withColumn、withColumnRenamed、cast(内置函数)
import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/people.json")
// 列的多种表示方法。使用'、""、$""、col()、df("")
// 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的地方都有效
df1.select('name, 'age, 'address).show
df1.select("name", "age", "address").show
df1.select($"name", $"age", $"address").show
df1.select(col("name"), col("age"), col("address")).show
df1.select(df1("name"), df1("age"), df1("address")).show
// 下面的写法无效并且会报错
// df1.select("name", "age"+10, "address").show
// df1.select("name", "age+10", "address").show
// 这样写才符合语法
df1.select($"name", $"age"+10, $"address").show
df1.select('name, 'age+10, 'address).show
// 可使用expr表达式(expr里面只能使用引号)
df1.select(expr("name"), expr("age+100"), expr("address")).show
df1.selectExpr("name as ename").show
df1.selectExpr("power(age, 2)", "address").show
df1.selectExpr("round(age, -3) as newAge", "name", "address").show
// drop、withColumn、 withColumnRenamed、casting
// drop 删除一个或多个列,得到新的DF
df1.drop("name")
df1.drop("name", "age")
// withColumn,修改列值
val df2 = df1.withColumn("age", $"age"+10)
df2.show
// withColumnRenamed,更改列名
df1.withColumnRenamed("name", "ename")
// 备注:drop、withColumn、withColumnRenamed返回的是DF
// 类型转化的两种方式
df1.selectExpr("cast(age as string)").printSchema
import org.apache.spark.sql.types._
df1.select('age.cast(StringType)).printSchema
where 相关的
val df1 = spark.read.json("src/main/resources/people.json")
// 过滤操作
df1.filter("age>30").show
df1.filter("age>30 and name=='Tom'").show
// 底层调用的就是filter算子
df1.where("age>30").show
df1.where("age>30 and name=='Tom'").show
groupBy相关的
groupBy、agg、max、min、avg、sum、count(后面5个为内置函数)
import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/people.json")
// 内置的sum max min avg count
df1.groupBy("address").sum("age").show
df1.groupBy("address").max("age").show
df1.groupBy("address").min("age").show
df1.groupBy("address").avg("age").show
df1.groupBy("address").count.show
// 类似having子句
df1.groupBy("address").avg("age").where("avg(age) > 20").show
df1.groupBy("address").avg("age").where($"avg(age)" > 20).show
// agg
df1.groupBy("address").agg("age"->"max", "age"->"min", "age"->"avg", "age"->"sum", "age"->"count").show
// 这种方式更好理解
df1.groupBy("address").agg(max("age"), min("age"), avg("age"),
sum("age"), count("age")).show
// 给列取别名
df1.groupBy("address").agg(max("age"), min("age"), avg("age"),
sum("age"), count("age")).withColumnRenamed("min(age)",
"minAge").show
// 给列取别名,最简便
df1.groupBy("address").agg(max("age").as("maxAge"),
min("age").as("minAge"), avg("age").as("avgAge"),
sum("age").as("sumAge"), count("age").as("countAge")).show
orderBy相关的
import spark.implicits.
val df1 = spark.read.json("src/main/resources/people.json")
// sort,以下语句等价
df1.sort("age").show
df1.sort($"age").show
df1.sort($"age".asc).show
df1.sort($"age".desc).show
df1.sort(-$"age").show
df1.sort(-'age, -'name).show
// orderBy,底层调用的还是sort
df1.orderBy("age").show
join相关的
目前 Apache Spark 3.x 版本中,一共支持以下七种 Join 类型:
INNER JOIN
CROSS JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
FULL OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
在实现上,这七种 Join 对应的实现类分别如下:
object JoinType {
def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" | "semi" => LeftSemi
case "leftanti" | "anti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
准备数据
// 准备数据
val order = spark.sparkContext.parallelize(
Seq((1, 101, 2500), (2, 102, 1110), (3, 103, 500), (4, 102, 400))
).toDF("paymentId", "customerId", "amount")
val customer = spark.sparkContext.parallelize(
Seq((101, "ds"), (102, "ds_hadoop"), (103, "ds001"), (104, "ds002"), (105, "ds003"), (106, "ds004"))
).toDF("customerId", "name")
order 表
customer表
INNER JOIN
在 Spark 中,如果没有指定任何 Join 类型,那么默认就是 INNER JOIN。INNER JOIN 只会返回满足 Join 条件( join condition)的数据,这个在企业中用的应该比较多,具体如下:
// inner join
// 单字段关联
customer.join(order,"customerId").show
// 多字段关联 Seq(“customerId”, “name”)
customer.join(order,Seq("customerId")).show
执行结果
CROSS JOIN
这种类型的 Join 也称为笛卡儿积(Cartesian Product),Join 左表的每行数据都会跟右表的每行数据进行 Join,产生的结果行数为 m*n,所以在生产环境下尽量不要用这种 Join。下面是 CROSS JOIN 的使用例子:
// cross join
// 笛卡尔积
customer.crossJoin(order).show()
// 如果两张表出现相同的字段,可以使用下面的方式进去筛选 类似customer.name order.amount
customer.crossJoin(order)
.select(customer("name"), order("amount") )
.show
执行1结果
执行2结果,只显示select的字段
LEFT OUTER JOIN
LEFT OUTER JOIN 等价于 LEFT JOIN,这个 Join 的返回的结果相信大家都知道,我就不介绍了。下面三种写法都是等价的
// 俩个表关联字段名一致
customer.join(order, Seq("customerId"), "left_outer").show
customer.join(order, Seq("customerId"), "leftouter").show
customer.join(order, Seq("customerId"), "left").show
val order2 = spark.sparkContext.parallelize(
Seq((1, 101, 2500), (2, 102, 1110), (3, 103, 500), (4, 102, 400))
).toDF("paymentId", "custId", "amount")
// 如果两张表使用不同的字段进行关联的话,要使用三等号即===
customer.join(order2, $"customerId"===$"custId", "left").show
执行结果
RIGHT OUTER JOIN
和 LEFT OUTER JOIN 类似,RIGHT OUTER JOIN 等价于 RIGHT JOIN,下面三种写法也是等价的:
order.join(customer, Seq("customerId"), "right").show
order.join(customer, Seq("customerId"), "right_outer").show
order.join(customer, Seq("customerId"), "rightouter").show
FULL OUTER JOIN
FULL OUTER JOIN 的含义大家应该也都熟悉,会将左右表的数据全部显示出来。FULL OUTER JOIN 有以下四种写法:
order.join(customer, Seq("customerId"), "outer").show
order.join(customer, Seq("customerId"), "full").show
order.join(customer, Seq("customerId"), "full_outer").show
order.join(customer, Seq("customerId"), "fullouter").show
LEFT SEMI JOIN
LEFT SEMI JOIN 只会返回匹配右表的数据,而且 LEFT SEMI JOIN 只会返回左表的数据,右表的数据是不会显示的,下面三种写法都是等价的
order.join(customer, Seq("customerId"), "leftsemi").show
order.join(customer, Seq("customerId"), "left_semi").show
order.join(customer, Seq("customerId"), "semi").show
LEFT SEMI JOIN 其实可以用 IN/EXISTS 来改写
select * from order where customerId in (select customerId from customer)
LEFT ANTI JOIN
与 LEFT SEMI JOIN 相反,LEFT ANTI JOIN 只会返回没有匹配到右表的左表数据。而且下面三种写法也是等效的
order.join(customer, Seq("customerId"), "leftanti").show
order.join(customer, Seq("customerId"), "left_anti").show
order.join(customer, Seq("customerId"), "anti").show
LEFT SEMI JOIN 其实可以用 NOT IN/EXISTS 来改写
select * from order where customerId not in (select customerId from customer)
集合相关的
union、unionAll、intersect、except
main{
val lst = List(StudentAge(1, "Alice", 18),
StudentAge(2, "Andy", 19),
StudentAge(3, "Bob", 17),
StudentAge(4, "Justin", 21),
StudentAge(5, "Cindy", 20)
)
val ds1 = spark.createDataset(lst)
ds1.show()
val rdd = spark.sparkContext.makeRDD(List(StudentHeight("Alice", 160),
StudentHeight("Andy", 159),
StudentHeight("Bob", 170),
StudentHeight("Cindy", 165),
StudentHeight("Rose", 160))
)
val ds2 = rdd.toDS
// union、unionAll、intersect、except。集合的交、并、差
val ds3 = ds1.select("name")
val ds4 = ds2.select("sname")
// union 求并集,不去重 去重使用distinct
ds3.union(ds4).show
// 底层依旧调用的是union
ds3.unionAll(ds4).show
// intersect 求交
ds3.intersect(ds4).show
// except 求差
ds3.except(ds4).show
}
// 定义第一个数据集
case class StudentAge(sno: Int, name: String, age: Int)
// 定义第二个数据集
case class StudentHeight(sname: String, height: Int)
交集
差集
空值处理
na.fill、na.drop、na.replace、na.filter
import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/data/people.json")
// NA表示缺失值,即“Missing value”,是“not available”的缩写
// 删出含有空值的行
df1.na.drop.show
// 删除某列的空值和null
df1.na.drop(Array("age")).show
// 对全部列填充
df1.na.fill("NULL").show
// 对指定单列填充;对指定多列填充
df1.na.fill("NULL", Array("address")).show
df1.na.fill(Map("age" -> 0, "address" -> "NULL")).show
// 对指定的值进行替换
df1.na.replace(Array("address"), Map("NULL" -> "Shanghai"))
.na.replace(Array("age"), Map(0 -> 100))
.show
// 查询空值列或非空值列。isNull、isNotNull为内置函数
df1.where("address is null").show
df1.where($"address".isNull).show
df1.where(col("address").isNull).show
df1.filter("address is not null").show
df1.filter(col("address").isNotNull).show
Action操作
与RDD类似的操作
show、 collect、 collectAsList、 head、 first、 count、 take、 takeAsList、 reduce
// 隐式转换
import spark.implicits._
// show:显示结果,默认显示20行,截取(true)
spark.read.json("src/main/resources/data/people.json").show(100, false)
val df = spark.read.json("src/main/resources/data/people.json")
println(df.count())
// 输出数组arr 9
df.collect().foreach(println)
// 输出list
df.collectAsList().forEach(println)
// 输出head3条 输出数组1
df.head(3).foreach(println)
println(df.head(3))
// 输出第一条 head(1)
println(df.first())
// 底层调用的就是head 输出数组3
df.take(3).foreach(println)
// 底层调用take,再调用head
df.takeAsList(3).forEach(println)
获取结构属性的操作
printSchema、explain、columns、dtypes、col
val df1 = spark.read.json("src/main/resources/data/people.json")
// 结构属性
df1.columns.foreach(println) // 查看列名 address,age,name
df1.dtypes.foreach(println) // 查看列名和类型 (address,StringType) (age,LongType) (name,StringType)
df1.explain() // 参看执行计划
println(df1.col("name")) // 获取某个列
df1.printSchema // 常用