【大数据学习 | Spark-SQL】Spark-SQL编程
上面的是SparkSQL的API操作。
1. 将RDD转化为DataFrame对象
DataFrame:
DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这样的数据集可以用SQL查询。
创建方式
准备数据
1 zhangsan 20 male
2 lisi 30 female
3 wangwu 35 male
4 zhaosi 40 female
toDF方式。
package com.hainiu.spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object TestSparkSql{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test sql")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
//环境对象包装
import sqlSc.implicits._
//引入环境信息
val rdd = sc.textFile("data/a.txt")
.map(t => {
val strs = t.split(" ")
(strs(0).toInt, strs(1), strs(2).toInt)
})
//增加字段信息
val df = rdd.toDF("id", "name", "age")
df.show() //展示表数据
df.printSchema() //展示表格字段信息
}
}
使用样例类定义schema:
object TestSparkSql{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test sql")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
val rdd = sc.textFile("data/a.txt")
.map(t => {
val strs = t.split(" ")
Student(strs(0).toInt, strs(1), strs(2).toInt)
})
// val df = rdd.toDF("id", "name", "age")
val df = rdd.toDF()
df.show() //打印数据,以表格的形式打印数据
df.printSchema() //打印表的结构信息
}
}
case class Student(id:Int,name:String,age:Int)
createDataFrame方式
这种方式需要将rdd和schema信息进行合并,得出一个新的DataFrame对象
package com.hainiu.spark
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestSparkSqlWithCreate {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test create")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
val rdd = sc.textFile("data/a.txt")
.map(t => {
val strs = t.split(" ")
Row(strs(0).toInt, strs(1), strs(2).toInt)
})
// rdd + schema
val schema = StructType(
Array(
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType)
)
)
val df = sqlSc.createDataFrame(rdd, schema)
df.show()
df.printSchema()
}
}
2. SparkSQL的查询方式(推荐第二种写法)
第二个部分关于df的查询
第一种sql api的方式查询
- 使用的方式方法的形式编程
- 但是思想还是sql形式
- 和rdd编程特别相似的一种写法
object TestSql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test sql")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
val rdd = sc.textFile("data/a.txt")
.map(t => {
val strs = t.split(" ")
(strs(0).toInt, strs(1), strs(2).toInt,strs(3))
})
val df = rdd.toDF("id", "name", "age","gender")
//select * from student where age >20
//df.where("age >20")
//分组聚合
//df.groupby("gender").sum("age")
//几个问题
//聚合函数不能增加别名 聚合函数不能多次聚合 orderby不识别desc
// df.groupBy("gender").agg(count("id").as("id"),sum("age").as("age")).orderBy($"age".desc)
//字段标识可以是字符串,也可以是字段对象
//df.orderBy($"age".desc)
//df.orderBy(col("age").desc)
//df.orderBy(df("age").desc)
//增加字段对象可以实现高端操作
//df.select($"age".+(1))
//join问题
//val df1 = sc.makeRDD(Array(
// (1,100,98),
// (2,100,95),
// (3,90,92),
//(4,90,93)
//)).toDF("id","chinese","math")
//df.join(df1,"id") //字段相同
//df.join(df1,df("id")===df1("id"))
//窗口函数
//普通函数 聚合函数 窗口函数 sum|count|rowkey over (partition by gender order by age desc)
//按照条件分割完毕进行数据截取
//班级的前两名 每个性别年龄最高的前两个
//select *,row_number() over (partition by gender order by age desc) rn from table
import sqlSc.implicits._
import org.apache.spark.sql.functions._
df.withColumn("rn",row_number().over(Window.partitionBy("gender").orderBy($"age".desc)))
.where("rn = 1")
.show()
}
}
第二种纯sql形式的查询
- 首先注册表
- 然后使用sql查询
- 最终得出的还是dataFrame的对象
- 其中和rdd的编程没有任何的区别,只不过现在使用sql形式进行处理了而已
package com.hainiu.spark
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestSparkSqlWithCreate {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test create")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
val rdd = sc.textFile("data/a.txt")
.map(t => {
val strs = t.split(" ")
Row(strs(0).toInt, strs(1), strs(2).toInt,strs(3))
})
// rdd + schema
val schema = StructType(
Array(
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType),
StructField("gender",StringType),
)
)
val df = sqlSc.createDataFrame(rdd, schema)
//sql形式查询
//select col from table
df.createTempView("student")
val df1 = sqlSc.sql(
"""
|select count(1) cnt,gender from student group by gender
|""".stripMargin)
df1.createTempView("student1")
val df2 = sqlSc.sql(
"""
|select * from student1 where cnt>1
|""".stripMargin)
df2.show()
df2.printSchema()
}
}