SparkSQL数据模型综合实践
文章目录
- 1. 实战概述
- 2. 实战步骤
- 2.1 创建数据集
- 2.2 创建数据模型对象
- 2.2.1 创建常量
- 2.2.2 创建加载数据方法
- 2.2.3 创建过滤年龄方法
- 2.2.4 创建平均薪水方法
- 2.2.5 创建主方法
- 2.2.6 查看完整代码
- 2.3 运行程序,查看结果
- 3. 实战小结
1. 实战概述
- 在本次实战中,创建一个名为
DataModel
的Spark SQL
数据模型对象,用于演示如何加载数据集、过滤数据以及计算统计信息。首先,在项目根目录下创建data
目录,并在其中创建了包含员工信息的employees.json
文件。然后,创建DataModel
对象,并定义spark
常量以及三个方法:loadData()
、filterAge()
和avgSalary()
,分别用于加载数据、过滤年龄大于20
岁的员工和计算不同性别的平均工资。最后,在main()
方法中调用这些方法来执行数据处理任务。
2. 实战步骤
2.1 创建数据集
- 在项目根目录创建
data
目录
- 在
data
里创建employees.json
{"name": "赵天宇", "gender": "男", "age": "19", "salary": "10000"}
{"name": "钱文博", "gender": "男", "age": "29", "salary": "8000"}
{"name": "孙志强", "gender": "男", "age": "39", "salary": "9000"}
{"name": "李明宇", "gender": "男", "age": "22", "salary": "11000"}
{"name": "周雨涵", "gender": "女", "age": "19", "salary": "14000"}
{"name": "吴美琪", "gender": "女", "age": "35", "salary": "10000"}
2.2 创建数据模型对象
-
创建
net.huawei.practice
包
-
在
practice
子包里创建DataModel
对象
2.2.1 创建常量
- 在
DataModel
对象里创建spark
常量
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("DataModel") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
2.2.2 创建加载数据方法
-
loadData()
方法的需求说明:其主要功能是加载指定路径的文件并生成 DataFrame。方法接受一个字符串类型的参数filePath
,表示文件的路径。执行流程包括使用spark.read.json(filePath)
方法读取 JSON 文件并创建 DataFrame,然后将生成的 DataFrame 返回给调用者。此方法简化了从文件加载数据到 DataFrame 的过程,提高了数据处理的效率和便捷性。 -
创建
loadData()
方法
// 加载数据方法
def loadData(): DataFrame = {
// 加载数据得到数据帧对象
val fileDF = spark.read.json("data/employees.json")
// 返回数据帧对象
fileDF
}
2.2.3 创建过滤年龄方法
filterAge()
方法的需求说明:该方法用于过滤 DataFrame 中年龄大于20
岁的数据,并将结果打印到控制台。方法接受一个 DataFrame 类型的参数employeeDF
,表示原始 JSON 文件对应的 DataFrame 实例对象。方法不返回任何值(返回类型为 Unit)。核心思路包括使用 DataFrame 的filter
方法根据条件过滤数据,然后使用show
方法打印过滤后的结果。此方法简化了数据过滤和展示的过程,有助于快速分析和查看特定条件下的数据。- 创建
filterAge()
方法
// 过滤年龄方法
def filterAge(employeeDF: DataFrame): Unit = {
// 过滤年龄大于20岁的员工
val filterAgeDF = employeeDF.filter("age > 20")
// 显示过滤后的数据
filterAgeDF.show()
}
2.2.4 创建平均薪水方法
avgSalary()
方法,其需求是计算并打印 DataFrame 中不同性别的平均收入。方法接受一个 DataFrame 类型的参数employeeDF
,表示原始 JSON 文件对应的 DataFrame 实例对象。方法不返回任何值(返回类型为 Unit)。核心思路包括使用createOrReplaceTempView
方法将 DataFrame 注册为临时表,然后通过 SQL 查询计算不同性别的平均收入,最后使用show
方法将结果打印到控制台。此方法简化了数据处理流程,有助于快速分析和展示特定统计信息。- 创建
avgSalary()
方法
// 计算平均工资方法
def avgSalary(employeeDF: DataFrame): Unit = {
// 根据输入参数注册临时表
employeeDF.createOrReplaceTempView("employee")
// 计算平均工资
val avgSalaryDF = spark.sql(
s"""
|SELECT
| gender, avg(salary) AS avg_salary
|FROM
| employee
|GROUP BY
| gender
|""".stripMargin
)
// 显示平均工资
avgSalaryDF.show()
}
2.2.5 创建主方法
main()
方法,该方法是程序的入口点,用于调用filterAge
和avgSalary
方法。main
方法接受一个Array[String]
类型的参数args
,这些参数可以在程序执行时从外部传递,使得程序能够动态使用这些参数而无需修改代码。main
方法不返回任何值(返回类型为 Unit)。业务代码的核心思路是首先调用loadData()
方法加载数据,然后将返回的 DataFrame 传递给filterAge
和avgSalary
方法进行处理。这种方法结构清晰,便于管理和扩展程序功能。- 创建
main()
方法
// 主方法
def main(args: Array[String]): Unit = {
// 调用过滤年龄方法
filterAge(loadData())
// 调用计算平均工资方法
avgSalary(loadData())
}
2.2.6 查看完整代码
package net.huawei.practice
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 功能:数据模型演示
* 作者:华卫
* 日期:2025年01月16日
*/
object DataModel {
// 获取或创建Spark会话对象
val spark = SparkSession.builder() // 创建Builder对象
.appName("DataModel") // 设置应用程序名称
.master("local[*]") // 运行模式:本地运行
.getOrCreate() // 获取或创建Spark会话对象
// 加载数据方法
def loadData(): DataFrame = {
// 加载数据得到数据帧对象
val fileDF = spark.read.json("data/employees.json")
// 返回数据帧对象
fileDF
}
// 过滤年龄方法
def filterAge(employeeDF: DataFrame): Unit = {
// 过滤年龄大于20岁的员工
val filterAgeDF = employeeDF.filter("age > 20")
// 显示过滤后的数据
filterAgeDF.show()
}
// 计算平均工资方法
def avgSalary(employeeDF: DataFrame): Unit = {
// 根据输入参数注册临时表
employeeDF.createOrReplaceTempView("employee")
// 计算平均工资
val avgSalaryDF = spark.sql(
s"""
|SELECT
| gender, avg(salary) AS avg_salary
|FROM
| employee
|GROUP BY
| gender
|""".stripMargin
)
// 显示平均工资
avgSalaryDF.show()
}
// 主方法
def main(args: Array[String]): Unit = {
// 调用过滤年龄方法
filterAge(loadData())
// 调用计算平均工资方法
avgSalary(loadData())
}
}
2.3 运行程序,查看结果
- 运行
DataModel
对象
3. 实战小结
- 在本次拓展练习中,我们通过创建一个 SparkSQL 数据模型综合实践项目,深入理解了 Spark 中的数据模型和数据处理流程。首先,我们在项目根目录下创建了
data
目录,并在其中创建了employees.json
文件,用于存储员工数据。接着,我们创建了DataModel
对象,并在其中定义了spark
常量和三个方法:loadData()
、filterAge()
和avgSalary()
,分别用于加载数据、过滤年龄大于20
岁的员工和计算不同性别的平均薪水。在main()
方法中,我们调用了这些方法来执行数据处理任务。通过这个练习,我们不仅学会了如何在 Spark 中操作 DataFrame,还学会了如何将数据处理逻辑封装成方法,提高了代码的可读性和可维护性。此外,我们还学会了如何使用 SQL 查询来分析数据,这在处理结构化数据时非常有用。总的来说,这个练习帮助我们更好地理解了 SparkSQL 的数据模型和数据处理流程,为今后的数据处理工作打下了坚实的基础。