Spark SQL DSL
1、 Spark sql -- 代替hive的(并非完全代替)
(1) Spark sql 和 hive 区别 :
两者都是写sql的,区别是计算引擎不一样
hive -- 计算引擎是MapReduce ,是通过MR做计算的
Spark sql -- 计算引擎是Saprk Core,是通过Spark Core做计算的
Spark sql 功能比 hive 强大 : 并非只能写sql
hive只能在shell行写sql
spark可以在代码中写sql
(2) Spark sql结构 :
1、 Data Source API(读数据) : 可以读取 csv(文本文件)、 json、 jdbc 等各种各样的数据做处理
2、 Data Frame API(提供了两个API):
Dataframe DSL -- 写代码 (DSL : 类SQL语法,与SQL差不多,但它是代码)
Spark SQL and HQL -- 写SQL
(3) DataFrame : 数据框(二维的表结构,类似hive的一张表)
写SQL的前提 : 有表
DataFrame 是基于 RDD 做了封装, 在上面提供了 列名和列类型 的概念,即表的结构的概念。
可以基于 DataFrame 去写 SQL 。
2、 写Spark SQL :
在spark sql中, shuffle之后分区数不是由前面的RDD决定的,而是有默认值, 默认200个。 可以指定参数修改。
(1) 导入Spark SQL依赖 -- 在Spark项目的pom文件中加入
<!-- Spark sql核心依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
(2) 在Spark项目下创建sql包 -- 新的模块一定要新建新的包
*项目名称一定要小写,多个单词之间用-分割 : s1-v1_1.2
*包名也要小写,一般是公司域名倒写 : com.shujia.spark
(3) 创建Spark sql环境 :
val spark: SparkSession = SparkSession
.builder() // 构建
.appName("wordCount")
.master("local")
// 设置 sparkSQL 在 shuffle 之后 DF 的分区数,默认是200
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate() // 当前环境有SparkSession就获取, 反之则创建
(4) 返回值不再是 RDD, 而是 DataFrame (DF)
查看数据不再是 foreach(), 而是 show()
(5) 针对于sql语句有多行的情况, 可以使用 """ """ 格式书写
val wordCountDF = spark.sql(
"""
|select word,count(1) as c from (
|select explode(split(line,',')) word from lines
|) as d
|group by word
|""".stripMargin) // stripMargin : 删除"|" 并合并以上sql语句
(6) 创建 DataFrame 的方式:
1、 读取 csv 格式的数据创建 DF
val studentDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",") //列的分割方式
.schema("id STRING, name STRING, age INT, gender STRING, clazz STRING") // 指定字段名和字段类型, 必须按照数据顺序指定
.load("data/students.txt") //指定读取的路径
2、 读取 json 格式的数据构建 DF
(spark 会自动解析json格式)
val studentJsonDF: DataFrame = spark
.read
.format("json")
.load("data/a.json")
3、 读取 jdbc 数据构建 DF
(通过网络远程读取 mysql 中的数据, 需要添加mysql依赖)
val jdbcDF: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306")
.option("dbtable", "bigdata.students")
.option("user", "root")
.option("password", "123456")
.load()
4、 读取 parquet 格式的数据构建 DF
(parquet格式的数据中自带 列名 和 列类型,
parquet会对数据进行压缩, 体积变小, 解压和压缩需要时间)
// 保存一个parquet格式的文件
studentDF
.write
.format("parquet")
.mode(SaveMode.Overwrite)
.save("data/parquet")
// 读取parquet格式的数据
val parquetDF: DataFrame = spark
.read
.format("parquet")
.load("data/parquet")
3、 DSL语法 -- 类sql语法
// spark sql 中必须要导入隐式转换, 才可以使用 $方法 获取列对象
import spark.implicits._
//导入 DSL 所有的函数
import org.apache.spark.sql.functions._
(1) show : 查看前面20条数据, 相当于action算子
action算子 -- 每一个Action算子都会触发一个job
(2) select : 选择字段, 和 sql 中 select 是一样
(3) $ : 是一个方法,作用是通过列名获取列的对象
studentDF.select($"id", $"age" + 2 as "age").show()
(4) where : 过滤数据
= : 赋值 == : 判断 === : 等于
(5) group by : 分组
(6) agg : 分组之后进行聚合计算
只能在分组后使用, 即一般跟在group函数后面
studentDF
.groupBy($"clazz")
// 分组之后做聚合计算 -- 可以写多个
.agg(count($"clazz") as "c", avg($"age") as "avgAge")
.show()
(7) join : 表关联
(8) 开窗函数 -- 统计每个班级总分前2的学生
withColumn : 给 DF 增加新的列
joinDF
// 按照 id 和 班级 分组
.groupBy($"id", $"clazz")
// 对分数求和
.agg(sum($"sco") as "sumSco")
// 使用开窗函数 -- row_number() over (partition by clazz order by sumSco desc)
// .select($"id", $"clazz", $"sumSco", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc) as "r")
// 在前面 DF 的基础上增加列 ( 上面的简写, 省去写 $"id", $"clazz", $"sumSco" 步骤, 直接将 "r" 加在 "sumSco" 后面 )
.withColumn("r", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
// 取 班级前2
.where($"r" <= 2).show()
(9) orderBy : 排序
DSL 语法 与 SQL 的异同 :
1、 DSL 和 SQL 功能相同, 但写法不同, 代码更简洁
2、 DSL 不需要做 子查询