文章目录
- 1. 打开项目
- 2. 查看数据集
- 2.1 查看JSON格式数据
- 2.2 查看CSV格式数据
- 2.3 查看TXT格式数据
- 3. 添加单元测试依赖
- 4. 创建数据加载与保存对象
- 4.1 创建Spark会话对象
- 4.2 创建加载JSON数据方法
- 4.3 创建加载CSV数据方法
- 4.4 创建加载Text数据方法
- 4.5 创建加载JSON数据扩展方法
- 4.6 创建加载CSV数据扩展方法
- 4.7 创建加载Text数据扩展方法
- 4.8 创建保存文本文件方法
- 4.9 查看程序完整代码
- 5. 实战小结
1. 打开项目
- 打开
SparkSQLDataSource
项目
2. 查看数据集
2.1 查看JSON格式数据
- 查看
users.json
文件
{"name": "李小玲", "gender": "女", "age": 45}
{"name": "童安格", "gender": "男", "age": 26}
{"name": "陈燕文", "gender": "女", "age": 18}
{"name": "王晓明", "gender": "男", "age": 32}
{"name": "张丽华", "gender": "女", "age": 29}
{"name": "刘伟强", "gender": "男", "age": 40}
{"name": "赵静怡", "gender": "女", "age": 22}
{"name": "孙强东", "gender": "男", "age": 35}
2.2 查看CSV格式数据
- 查看
users.csv
文件
name,gender,age
李小玲,女,45
童安格,男,26
陈燕文,女,18
王晓明,男,32
张丽华,女,29
刘伟强,男,40
赵静怡,女,22
孙强东,男,35
2.3 查看TXT格式数据
- 查看
users.txt
文件
李小玲 女 45
童安格 男 26
陈燕文 女 18
王晓明 男 32
张丽华 女 29
刘伟强 男 40
赵静怡 女 22
孙强东 男 35
3. 添加单元测试依赖
- 在
pom.xml
里添加单元测试框架依赖
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
- 刷新项目依赖
4. 创建数据加载与保存对象
- 创建
net.huawei.practice
包
- 在
practice
子包里创建DataLoadAndSave
对象
- 创建
DataLoadAndSave
伴生类
4.1 创建Spark会话对象
- 创建
spark
常量
val spark = SparkSession.builder()
.appName("DataLoadAndSave")
.master("local[*]")
.getOrCreate()
4.2 创建加载JSON数据方法
- 创建
loadJSONData()
方法
def loadJSONData(filePath: String): DataFrame = {
spark.read.json(filePath)
}
- 在伴生类里创建单元测试方法
testLoadJSONData()
方法
@Test
def testLoadJSONData(): Unit = {
val df = DataLoadAndSave.loadJSONData("data/users.json")
df.show()
}
- 运行
testLoadJSONData()
测试方法,查看结果
4.3 创建加载CSV数据方法
- 创建
loadCSVData()
方法
def loadCSVData(filePath: String): DataFrame = {
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(filePath)
}
- 在伴生类里创建单元测试方法
testLoadCSVData()
方法
@Test
def testLoadCSVData(): Unit = {
val df = DataLoadAndSave.loadCSVData("data/users.csv")
df.show()
}
- 运行
testLoadCSVData()
测试方法,查看结果
4.4 创建加载Text数据方法
- 创建
loadTextData()
方法
def loadTextData(filePath: String): DataFrame = {
spark.read.text(filePath)
}
- 在伴生类里创建单元测试方法
testLoadTextData()
方法
- 运行
testLoadTextData()
测试方法,查看结果
4.5 创建加载JSON数据扩展方法
- 创建
loadJSONDataExpand()
方法
def loadJSONDataExpand(filePath: String): DataFrame = {
spark.read.format("json").load(filePath)
}
- 在伴生类里创建单元测试方法
testLoadJSONDataExpand()
方法
- 运行
testLoadJSONDataExpand()
测试方法,查看结果
4.6 创建加载CSV数据扩展方法
- 创建
loadCSVDataExpand()
方法
def loadCSVDataExpand(filePath: String): DataFrame = {
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(filePath)
}
- 在伴生类里创建单元测试方法
testLoadCSVDataExpand()
方法
- 运行
testLoadCSVDataExpand()
测试方法,查看结果
4.7 创建加载Text数据扩展方法
- 创建
loadTextDataExpand()
方法
def loadTextDataExpand(filePath: String): DataFrame = {
spark.read.format("text").load(filePath)
}
- 在伴生类里创建单元测试方法
testLoadTextDataExpand()
方法
- 运行
testLoadTextDataExpand()
测试方法,查看结果
4.8 创建保存文本文件方法
- 创建
saveTextFile()
方法
def saveTextFile(inputPath: String, outputPath: String): Unit = {
val df = spark.read.format("text").load(inputPath)
df.write.mode("overwrite").format("text").save(outputPath)
}
- 在伴生类里创建单元测试方法
testSaveTextFile()
方法
- 运行
testSaveTextFile()
测试方法,查看结果
4.9 查看程序完整代码
package net.huawei.practice
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.Test
object DataLoadAndSave {
val spark = SparkSession.builder()
.appName("DataLoadAndSave")
.master("local[*]")
.getOrCreate()
def loadJSONData(filePath: String): DataFrame = {
spark.read.json(filePath)
}
def loadCSVData(filePath: String): DataFrame = {
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(filePath)
}
def loadTextData(filePath: String): DataFrame = {
spark.read.text(filePath)
}
def loadJSONDataExpand(filePath: String): DataFrame = {
spark.read.format("json").load(filePath)
}
def loadCSVDataExpand(filePath: String): DataFrame = {
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(filePath)
}
def loadTextDataExpand(filePath: String): DataFrame = {
spark.read.format("text").load(filePath)
}
def saveTextFile(inputPath: String, outputPath: String): Unit = {
val df = spark.read.format("text").load(inputPath)
df.write.mode("overwrite").format("text").save(outputPath)
}
}
class DataLoadAndSave {
@Test
def testLoadJSONData(): Unit = {
val df = DataLoadAndSave.loadJSONData("data/users.json")
df.show()
}
@Test
def testLoadCSVData(): Unit = {
val df = DataLoadAndSave.loadCSVData("data/users.csv")
df.show()
}
@Test
def testLoadTextData(): Unit = {
val df = DataLoadAndSave.loadTextData("data/users.txt")
df.show()
}
@Test
def testLoadJSONDataExpand(): Unit = {
val df = DataLoadAndSave.loadJSONDataExpand("data/users.json")
df.show()
}
@Test
def testLoadCSVDataExpand(): Unit = {
val df = DataLoadAndSave.loadCSVDataExpand("data/users.csv")
df.show()
}
@Test
def testLoadTextDataExpand(): Unit = {
val df = DataLoadAndSave.loadTextDataExpand("data/users.txt")
df.show()
}
@Test
def testSaveTextFile(): Unit = {
DataLoadAndSave.saveTextFile("data/users.txt", "result/users")
}
}
5. 实战小结
- 在本次实战中,我们通过
SparkSQLDataSource
项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave
对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()
、loadCSVData()
和loadTextData()
,分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()
方法灵活加载数据,并实现了数据保存功能,如saveTextFile()
方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。