当前位置: 首页 > article >正文

【大数据学习 | Spark-SQL】Spark-SQL编程

上面的是SparkSQL的API操作。

1. 将RDD转化为DataFrame对象

DataFrame:

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这样的数据集可以用SQL查询。

创建方式

准备数据

1 zhangsan 20 male
2 lisi 30 female
3 wangwu 35 male
4 zhaosi 40 female

toDF方式

package com.hainiu.spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object TestSparkSql{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sql")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
     //环境对象包装
    import sqlSc.implicits._
      //引入环境信息
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0).toInt, strs(1), strs(2).toInt)
      })
    //增加字段信息
    val df = rdd.toDF("id", "name", "age")
    df.show() //展示表数据
    df.printSchema() //展示表格字段信息
  }
}

使用样例类定义schema:

object TestSparkSql{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sql")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
         Student(strs(0).toInt, strs(1), strs(2).toInt)
      })

//    val df = rdd.toDF("id", "name", "age")
    val df = rdd.toDF()
    df.show() //打印数据,以表格的形式打印数据
    df.printSchema() //打印表的结构信息
  }
}
case class Student(id:Int,name:String,age:Int)

createDataFrame方式

这种方式需要将rdd和schema信息进行合并,得出一个新的DataFrame对象

package com.hainiu.spark

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object TestSparkSqlWithCreate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test create")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        Row(strs(0).toInt, strs(1), strs(2).toInt)
      })
//    rdd + schema
    val schema = StructType(
      Array(
        StructField("id",IntegerType),
        StructField("name",StringType),
        StructField("age",IntegerType)
      )
    )
    val df = sqlSc.createDataFrame(rdd, schema)
    df.show()
    df.printSchema()
  }
}

2. SparkSQL的查询方式(推荐第二种写法)

第二个部分关于df的查询

第一种sql api的方式查询

  • 使用的方式方法的形式编程
  • 但是思想还是sql形式
  • 和rdd编程特别相似的一种写法
object TestSql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test sql")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    import sqlSc.implicits._
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        (strs(0).toInt, strs(1), strs(2).toInt,strs(3))
      })
    val df = rdd.toDF("id", "name", "age","gender")
    //select * from student where age >20
    //df.where("age >20")
     //分组聚合
      //df.groupby("gender").sum("age")
    //几个问题
   //聚合函数不能增加别名 聚合函数不能多次聚合  orderby不识别desc 
   // df.groupBy("gender").agg(count("id").as("id"),sum("age").as("age")).orderBy($"age".desc) 
   //字段标识可以是字符串,也可以是字段对象
   //df.orderBy($"age".desc)   
   //df.orderBy(col("age").desc) 
   //df.orderBy(df("age").desc) 
   //增加字段对象可以实现高端操作
   //df.select($"age".+(1)) 
   //join问题
   //val df1 = sc.makeRDD(Array(
    //   (1,100,98),
     //  (2,100,95),
      // (3,90,92),
       //(4,90,93)
   //)).toDF("id","chinese","math")

   //df.join(df1,"id") //字段相同   
   //df.join(df1,df("id")===df1("id"))   
    //窗口函数
    //普通函数 聚合函数  窗口函数 sum|count|rowkey over (partition by gender order by age desc)
    //按照条件分割完毕进行数据截取
    //班级的前两名 每个性别年龄最高的前两个
    //select *,row_number() over (partition by gender order by age desc) rn from table
    import sqlSc.implicits._
    import org.apache.spark.sql.functions._
    df.withColumn("rn",row_number().over(Window.partitionBy("gender").orderBy($"age".desc)))
      .where("rn = 1")
      .show()
  }
}

第二种纯sql形式的查询

  • 首先注册表
  • 然后使用sql查询
  • 最终得出的还是dataFrame的对象
  • 其中和rdd的编程没有任何的区别,只不过现在使用sql形式进行处理了而已
package com.hainiu.spark

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object TestSparkSqlWithCreate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test create")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlSc = new SQLContext(sc)
    val rdd = sc.textFile("data/a.txt")
      .map(t => {
        val strs = t.split(" ")
        Row(strs(0).toInt, strs(1), strs(2).toInt,strs(3))
      })
//    rdd + schema
    val schema = StructType(
      Array(
        StructField("id",IntegerType),
        StructField("name",StringType),
        StructField("age",IntegerType),
        StructField("gender",StringType),
      )
    )
    val df = sqlSc.createDataFrame(rdd, schema)
    //sql形式查询
    //select col from table
    df.createTempView("student")
    val df1 = sqlSc.sql(
      """
        |select count(1) cnt,gender from student group by gender
        |""".stripMargin)
    df1.createTempView("student1")
    val df2 = sqlSc.sql(
      """
        |select * from student1 where cnt>1
        |""".stripMargin)
    df2.show()
    df2.printSchema()
  }
}

http://www.kler.cn/a/413127.html

相关文章:

  • 苹果系统中利用活动监视器来终止进程
  • digit_eye开发记录(2): Python读取MNIST数据集
  • 了解 CSS position 属性
  • elasticsearch报错fully-formed single-node cluster with cluster UUID
  • 【软考速通笔记】系统架构设计师③——信息安全技术基础知识
  • 网上蛋糕售卖店管理系(Java+SpringBoot+MySQL)
  • 如何做好一份技术文档?
  • 新型大语言模型的预训练与后训练范式,阿里Qwen
  • 网络安全审计机制与实现技术
  • Unity3D Lua如何支持面向对象详解
  • 使用 pycharm 新建不使用 python 虚拟环境( venv、conda )的工程
  • 摄像头原始数据读取——gstreamer(gst_parse_launch)
  • UI设计-色彩、层级、字体、边距(一)
  • java脚手架系列16-AI大模型集成
  • 使用Hutool读取大Excel
  • C++学习日记---第14天(蓝桥杯备赛)
  • 前端实现把整个页面转成PDF保存到本地(DOM转PDF)
  • 梧桐数据库的高效索引技术分析
  • Rust语言俄罗斯方块(漂亮的界面案例+详细的代码解说+完美运行)
  • 前端框架 react 性能优化
  • 【网络安全设备系列】10、安全审计系统
  • 在WSL 2 (Ubuntu 22.04)安装Docker Ce 启动错误解决
  • 【H2O2|全栈】Node.js(1)
  • 5G Multicast/Broadcast Services(MBS) (五)
  • (计算机网络)期末
  • 在 Spring Boot 中构造 API 响应的最佳实践