当前位置: 首页 > article >正文

Spark SQL大数据分析快速上手-DataFrame应用体验

【图书介绍】《Spark SQL大数据分析快速上手》-CSDN博客

《Spark SQL大数据分析快速上手》【摘要 书评 试读】- 京东图书

大数据与数据分析_夏天又到了的博客-CSDN博客

本节主要介绍如何使用DataFrame进行编程。

4.1.1  SparkSession

在旧版本中,Spark SQL提供两种SQL查询起始点:一个叫作SQLContext,用于Spark自己提供的SQL查询;一个叫作HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合。因此,在SQLContext和HiveContext上可用的API,在SparkSession上同样可以使用。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

当我们使用Spark Shell的时候,Spark会自动创建一个叫作spark的SparkSession,就像以前可以自动获取一个sc来表示SparkContext一样,如图4-1所示。

图4-1  自动创建SparkSession

4.1.2  DataFrame应用

Spark SQL的DataFrame API允许我们使用DataFrame而不必去注册临时表或者生成SQL表达式。DataFrame API既有转换操作,也有行动操作;DataSet API则提供了更加函数式的API。

1. 创建DataFrame

有了SparkSession之后,可以通过以下3种方式来创建DataFrame:

  • 通过Spark的数据源来创建。
  • 通过已知的RDD来创建。
  • 通过查询一个Hive表来创建。

Spark支持的数据源如图4-2所示。

图4-2  Spark支持的数据源

通过Spark数据源创建DataFrame的代码如下:

// 读取 JSON 文件
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

// 展示结果
scala> df.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

其中,employees.json文件内容如下:

{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
2. DataFrame语法风格

1)SQL语法风格

SQL语法风格是指我们查询数据的时候可以使用SQL语句。这种SQL语句风格的查询必须有临时视图或者全局视图来辅助。

创建视图的数据来源于people.json,其内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

创建临时视图的代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/ src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建全局视图的代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createGlobalTempView("people")

scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res31.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2)DSL语法风格

DataFrame提供一个特定领域语言(domain-specific language,DSL)去管理结构化的数据。可以在Scala、Java、Python和R中使用DSL。使用DSL语法风格就不必创建临时视图了。

(1)查看schema信息,示例代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

(2)使用DSL查询,示例代码如下:

只查询name列数据:

scala> df.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|

查询name和age列数据:

scala> df.select("name", "age").show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

查询name和age + 1的数据:

scala> df.select($"name", $"age" + 1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

查询age大于20的数据:

scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

按照age分组,查看数据条数:

scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
3. RDD和DataFrame的交互

1)从RDD到DataFrame

涉及RDD、DataFrame、DataSet之间的操作时,需要进行导入,即import spark.implicits._。这里的spark不是包名,而是表示SparkSession的那个对象,所以必须先创建SparkSession对象再导入;implicits是一个内部对象。

首先创建一个RDD:

scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24

然后进行转换,转换有3种方法:手动转换、通过样例类反射转换和通过API的方式转换。

(1)手动转换。

示例代码如下:

scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26

// 转换为DataFrame的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

(2)通过样例类反射转换。

首先创建样例类:

scala> case class People(name :String, age: Int)
defined class People

然后使用样例把 RDD 转换成DataFrame:

scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28

scala> rdd2.toDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

(3)通过API的方式转换。

通过API方式转换不能在spark命令行下进行,需要编写完整的Scala程序代码,示例代码  如下:

代码4-1  DataFrameDemo.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object DataFrameDemo {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
            .master("local[*]")
            .appName("Word Count")
            .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
        // 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
        val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
        // 创建 StructType 类型
        val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
        val df: DataFrame = spark.createDataFrame(rowRdd, types)
        df.show

    }
}

2)从DataFrame到RDD

直接调用DataFrame的rdd方法就能完成转换。示例代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25

scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])


http://www.kler.cn/a/388977.html

相关文章:

  • 在 Service Worker 中caches.put() 和 caches.add()/caches.addAll() 方法他们之间的区别
  • C++单例模式实现
  • JavaScript Cookie 与 服务器生成的 Cookie 的区别与应用
  • windows 11编译安装ffmpeg(包含ffplay)
  • 考研季来啦!考研过程中有哪些事情需要避坑?
  • .NET中通过C#实现Excel与DataTable的数据互转
  • jmeter常用配置元件介绍总结之用linux服务器压测
  • 如何让ffmpeg运行时从当前目录加载库,而不是从/lib64
  • React的概念以及发展前景如何?
  • 2024-2025第九届华为ICT大赛中国创新赛问题解答
  • 【Python】Pygame实战:实现基础跑酷游戏机(附源码)
  • Redis设计与实现 学习笔记 第十六章 Sentinel
  • 前端实现文件下载常用几种方式
  • 计算机课程管理:Spring Boot实现的工程认证解决方案
  • 中仕公考:2025年各地区公务员招考公告汇总
  • 【SpringBoot】18 上传文件到数据库(Thymeleaf + MySQL)
  • 关系数据的可视化——Python大数据可视化
  • Android 开启混淆R8编译问题处理
  • nVisual前端目录结构
  • [MySQL#14] 视图 | 用户管理 | 权限设置
  • RTOS IOT 结构
  • 速盾:游戏盾的功能和原理详解
  • 【HarmonyOS NEXT】一次开发多端部署(以轮播图、Tab栏、列表为例,配合栅格布局与媒体查询,进行 UI 的一多开发)
  • 【SQL】在 SQL Server 中创建数据源是 MySQL 数据表的视图
  • 如何编译安装LNMP环境
  • 论文阅读:人工智能赋能源网荷储协同互动的应用及展望