当前位置: 首页 > article >正文

SparkSQL数据源与数据存储综合实践

文章目录

  • 1. 打开项目
  • 2. 查看数据集
    • 2.1 查看JSON格式数据
    • 2.2 查看CSV格式数据
    • 2.3 查看TXT格式数据
  • 3. 添加单元测试依赖
  • 4. 创建数据加载与保存对象
    • 4.1 创建Spark会话对象
    • 4.2 创建加载JSON数据方法
    • 4.3 创建加载CSV数据方法
    • 4.4 创建加载Text数据方法
    • 4.5 创建加载JSON数据扩展方法
    • 4.6 创建加载CSV数据扩展方法
    • 4.7 创建加载Text数据扩展方法
    • 4.8 创建保存文本文件方法
    • 4.9 查看程序完整代码
  • 5. 实战小结

1. 打开项目

  • 打开SparkSQLDataSource项目
    在这里插入图片描述

2. 查看数据集

2.1 查看JSON格式数据

  • 查看users.json文件
    在这里插入图片描述
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}

2.2 查看CSV格式数据

  • 查看users.csv文件
    在这里插入图片描述
name,gender,age
李小玲,,45
童安格,,26
陈燕文,,18
王晓明,,32
张丽华,,29
刘伟强,,40
赵静怡,,22
孙强东,,35

2.3 查看TXT格式数据

  • 查看users.txt文件
    在这里插入图片描述
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35

3. 添加单元测试依赖

  • pom.xml里添加单元测试框架依赖
    在这里插入图片描述
<dependency>                                    
    <groupId>junit</groupId>                    
    <artifactId>junit</artifactId>              
    <version>4.13.2</version>                   
</dependency>                                   
  • 刷新项目依赖
    在这里插入图片描述

4. 创建数据加载与保存对象

  • 创建net.huawei.practice
    在这里插入图片描述
  • practice子包里创建DataLoadAndSave对象
    在这里插入图片描述
  • 创建DataLoadAndSave伴生类
    在这里插入图片描述

4.1 创建Spark会话对象

  • 创建spark常量
    在这里插入图片描述
// 获取或创建Spark会话对象                                  
val spark = SparkSession.builder() // 创建Builder对象  
  .appName("DataLoadAndSave") // 设置应用程序名称          
  .master("local[*]") // 运行模式:本地运行                 
  .getOrCreate() // 获取或创建Spark会话对象                 

4.2 创建加载JSON数据方法

  • 创建loadJSONData()方法
    在这里插入图片描述
// 加载JSON数据方法                                       
def loadJSONData(filePath: String): DataFrame = {   
  spark.read.json(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadJSONData()方法
    在这里插入图片描述
@Test                                                      
def testLoadJSONData(): Unit = {                           
  // 加载JSON数据                                              
  val df = DataLoadAndSave.loadJSONData("data/users.json") 
  // 显示数据                                                  
  df.show()                                                
}                                                          
  • 运行testLoadJSONData()测试方法,查看结果
    在这里插入图片描述

4.3 创建加载CSV数据方法

  • 创建loadCSVData()方法
    在这里插入图片描述
// 加载CSV数据方法                                           
def loadCSVData(filePath: String): DataFrame = {       
  spark.read                                           
    .option("header", "true")                          
    .option("inferSchema", "true")                     
    .csv(filePath)                                     
}                                                      
  • 在伴生类里创建单元测试方法testLoadCSVData()方法
    在这里插入图片描述
@Test                                                       
def testLoadCSVData(): Unit = {                             
  // 加载CSV数据                                                
  val df = DataLoadAndSave.loadCSVData("data/users.csv")    
  // 显示数据                                                   
  df.show()                                                 
}                                                           
  • 运行testLoadCSVData()测试方法,查看结果
    在这里插入图片描述

4.4 创建加载Text数据方法

  • 创建loadTextData()方法
    在这里插入图片描述
// 加载TEXT数据方法                                       
def loadTextData(filePath: String): DataFrame = {   
  spark.read.text(filePath)                         
}                                                   
  • 在伴生类里创建单元测试方法testLoadTextData()方法
    在这里插入图片描述
  • 运行testLoadTextData()测试方法,查看结果
    在这里插入图片描述

4.5 创建加载JSON数据扩展方法

  • 创建loadJSONDataExpand()方法
    在这里插入图片描述
// 加载JSON数据扩展方法                                         
def loadJSONDataExpand(filePath: String): DataFrame = { 
  spark.read.format("json").load(filePath)              
}                                                       
  • 在伴生类里创建单元测试方法testLoadJSONDataExpand()方法
    在这里插入图片描述
  • 运行testLoadJSONDataExpand()测试方法,查看结果
    在这里插入图片描述

4.6 创建加载CSV数据扩展方法

  • 创建loadCSVDataExpand()方法
    在这里插入图片描述
// 加载CSV数据扩展方法                                            
def loadCSVDataExpand(filePath: String): DataFrame = {    
  spark.read.format("csv")                                
    .option("header", "true")                             
    .option("inferSchema", "true")                        
    .load(filePath)                                       
}                                                         
  • 在伴生类里创建单元测试方法testLoadCSVDataExpand()方法
    在这里插入图片描述
  • 运行testLoadCSVDataExpand()测试方法,查看结果
    在这里插入图片描述

4.7 创建加载Text数据扩展方法

  • 创建loadTextDataExpand()方法
    在这里插入图片描述
//  加载TEXT数据扩展方法                                          
def loadTextDataExpand(filePath: String): DataFrame = {   
  spark.read.format("text").load(filePath)                
}                                                         
  • 在伴生类里创建单元测试方法testLoadTextDataExpand()方法
    在这里插入图片描述
  • 运行testLoadTextDataExpand()测试方法,查看结果
    在这里插入图片描述

4.8 创建保存文本文件方法

  • 创建saveTextFile()方法
    在这里插入图片描述
// 保存数据到文本文件方法                                                   
def saveTextFile(inputPath: String, outputPath: String): Unit = {
  // 加载文本数据                                                      
  val df = spark.read.format("text").load(inputPath)             
  // 保存文本数据                                                      
  df.write.mode("overwrite").format("text").save(outputPath)     
}                                                                
  • 在伴生类里创建单元测试方法testSaveTextFile()方法
    在这里插入图片描述
  • 运行testSaveTextFile()测试方法,查看结果
    在这里插入图片描述

4.9 查看程序完整代码

package net.huawei.practice

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test

/**
 * 功能:数据加载与保存
 * 作者:华卫
 * 日期:2025年01月18日
 */
object DataLoadAndSave {
  // 获取或创建Spark会话对象
  val spark = SparkSession.builder() // 创建Builder对象
    .appName("DataLoadAndSave") // 设置应用程序名称
    .master("local[*]") // 运行模式:本地运行
    .getOrCreate() // 获取或创建Spark会话对象

  // 加载JSON数据方法
  def loadJSONData(filePath: String): DataFrame = {
    spark.read.json(filePath)
  }

  // 加载CSV数据方法
  def loadCSVData(filePath: String): DataFrame = {
    spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(filePath)
  }

  // 加载TEXT数据方法
  def loadTextData(filePath: String): DataFrame = {
    spark.read.text(filePath)
  }

  // 加载JSON数据扩展方法
  def loadJSONDataExpand(filePath: String): DataFrame = {
    spark.read.format("json").load(filePath)
  }

  // 加载CSV数据扩展方法
  def loadCSVDataExpand(filePath: String): DataFrame = {
    spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(filePath)
  }

  //  加载TEXT数据扩展方法
  def loadTextDataExpand(filePath: String): DataFrame = {
    spark.read.format("text").load(filePath)
  }

  // 保存数据到文本文件方法
  def saveTextFile(inputPath: String, outputPath: String): Unit = {
    // 加载文本数据
    val df = spark.read.format("text").load(inputPath)
    // 保存文本数据
    df.write.mode("overwrite").format("text").save(outputPath)
  }
}

// 伴生类
class DataLoadAndSave {
  @Test
  def testLoadJSONData(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONData("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVData(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVData("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextData(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextData("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadJSONDataExpand(): Unit = {
    // 加载JSON数据
    val df = DataLoadAndSave.loadJSONDataExpand("data/users.json")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadCSVDataExpand(): Unit = {
    // 加载CSV数据
    val df = DataLoadAndSave.loadCSVDataExpand("data/users.csv")
    // 显示数据
    df.show()
  }

  @Test
  def testLoadTextDataExpand(): Unit = {
    // 加载TEXT数据
    val df = DataLoadAndSave.loadTextDataExpand("data/users.txt")
    // 显示数据
    df.show()
  }

  @Test
  def testSaveTextFile(): Unit = {
    // 保存数据到文本文件
    DataLoadAndSave.saveTextFile("data/users.txt", "result/users")
  }
}

5. 实战小结

  • 在本次实战中,我们通过SparkSQLDataSource项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()loadCSVData()loadTextData(),分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()方法灵活加载数据,并实现了数据保存功能,如saveTextFile()方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。

http://www.kler.cn/a/510015.html

相关文章:

  • Python与PyTorch的浅拷贝与深拷贝
  • 微透镜阵列精准全检,白光干涉3D自动量测方案提效70%
  • npm ERR! code CERT_HAS_EXPIRED
  • Java定时任务不明原因挂掉(定时任务挂掉)以及建议
  • Linux SUID提权
  • .NET 学习:从基础到进阶的全面指南
  • [Effective C++]条款47 萃取器
  • 洛谷P4017 最大食物链计数(图的拓扑排序)
  • 从新手到高手的蜕变:MySQL 约束进阶全攻略
  • vue 实现打印功能
  • 期望最大化算法:机器学习中的隐变量与参数估计的艺术
  • AIGC - 深度洞察如何对大模型进行微调以满足特定需求
  • RPA编程实践:Electron实践开始
  • vllm稳定输出json
  • 素描风格渲染
  • 基于Java+Sql Server实现的(GUI)学籍管理系统
  • springboot基于微信小程序的传统美食文化宣传平台小程序
  • docker 基础语法学习,K8s基础语法学习,零基础学习
  • python-leetcode-存在重复元素 II
  • Linux shell zip 命令实现不切换当前终端的工作目录打包另一个路径下的文件和文件夹
  • TCP 重传演进:TCP RACK Timer 能替代 RTO 吗
  • 【触想智能】工业电脑一体机在数控机床设备上应用的注意事项以及工业电脑日常维护知识分享
  • 《汽车与驾驶维修》是什么级别的期刊?是正规期刊吗?能评职称吗?
  • 使用 Java 和 FreeMarker 实现自动生成供货清单,动态生成 Word 文档,简化文档处理流程。
  • Vue.js组件开发全解析
  • Excel中函数SIGN()的用法