SparkSQL编程实践
文章目录
- SparkSQL编程实践
- 1.1. 编程模型介绍
- 1.2. SparkSQL编程
- 1.2.1. 第三方库安装
- 1.2.2. SparkSQL程序的结构
- 1.2.3. SparkSQL执行模式
- 1.2.3.1. Local模式
- 1.2.3.2. 集群模式
- 1.2.4. 数据加载
- 1.2.4.1. 通过RDD创建DataFrame
- 1.2.4.2. 通过读取数据外部数据创建DataFrame
- 标准读取方式
- 读取JSON数据
- 读取CSV数据
- 读取Parquet数据
- 读取orc数据
- 读取txt数据
- 读取数据库数据
- 1.2.5. DSL风格处理数据
- 1.2.5.1. printSchema
- 1.2.5.2. show
- 1.2.5.3. filter
- 1.2.5.4. where
- 1.2.5.5. select
- 1.2.5.6. groupBy
- 1.2.5.7. dropDuplicates
- 1.2.5.8. dropna
- 1.2.5.9. fillna
- 1.2.6. SQL风格处理数据
- 1.2.7. 数据落地
- 1.2.7.1. 标准落地方式
- 1.2.7.2. 落地JSON数据
- 1.2.7.3. 落地CSV数据
- 1.2.7.4. 落地Parquet数据
- 1.2.7.5. 落地orc数据
- 1.3.7.6. 落地txt数据
- 1.3.7.7. 落地数据库数据
SparkSQL编程实践
1.1. 编程模型介绍
在SparkCore中使用到的编程模型是RDD。RDD是一种弹性分布式数据集,是对我们需要处理的数据进行的抽象。其中可以存储任意的数据,没有标准的数据结构。
而SparkSQL使用到的编程模型是DataFrame,是在RDD的基础上添加了Schema信息。所谓的Scheme信息指的是描述数据的信息,也可以认为是“元数据”,DataFrame的前身就是SchemaRDD。
假设RDD中的几行数据长这样;
1 | 张三 | 20 |
---|---|---|
2 | 李四 | 21 |
3 | 王五 | 22 |
那么在DataFrame中数据就变成这样;
ID:Int | Name:String | Age:Int |
---|---|---|
1 | 张三 | 20 |
2 | 李四 | 21 |
3 | 王五 | 22 |
从上面两个表格可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法等,有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。
其实SparkSQL中的DataFrame是借鉴了Python的Pandas库中的DataFrame的思想而来的,都是将结构化的数据映射成为一张二维的表格对数据进行处理。只不过Pandas中的DataFrame只能够处理本地的一些数据文件,而SparkSQL中的DataFrame却是能够处理分布式数据集合的编程模型。
1.2. SparkSQL编程
1.2.1. 第三方库安装
Spark本身是使用Scala语言来编写的,原生支持Scala、Java编程语言。而我们现在需要使用Python来进行操作,就需要下载安装第三方库pyspark,这个库是Spark官方发布的一个专门使用Python来操作Spark的库,我们直接使用pip就可以安装。
pip install pyspark==3.1.2
**注意:**在安装的时候,一定要与你的Spark的版本是一致的,否则会出现兼容性的问题。
1.2.2. SparkSQL程序的结构
在SparkSQL的程序中,程序的入口是SparkSession对象。在上方的Spark任务提交的部分,提到过一个比较重要的类是SparkContext,而在SparkSession中,对SparkContext进行了封装。
# 导入依赖库
from pyspark.sql import SparkSession
# 创建程序入口SparkSession对象
# .builder : 获取到SparkSessionBuilder对象,用于创建SparkSession对象
# .master : 设置Master是谁,可以设置为local表示本地模式
# .appName : Spark程序的名字
# .getOrCreate : 根据前面设置的属性,用来创建一个SparkSession对象
spark = SparkSession.builder.master("local[*]").appName("spark-sql").getOrCreate()
# 使用SparkSQL对数据进行处理的过程
# 在程序结束的时候,释放SparkSession对象,释放资源
spark.stop()
1.2.3. SparkSQL执行模式
1.2.3.1. Local模式
将master设置为local,即可开启local模式,使用这种模式进行程序的开发。在local模式下,一般用来做程序的测试的用途。
对于分布式计算程序的开发,local模式其实是必不可少的。我们需要在开发的过程中,不断的使用测试数据来测试我们的业务逻辑是否正确,对程序进行及时的调整。此时一般会使用与真实的数据相同格式的、数据量比较少的测试数据,来测试程序的正确性。测试通过之后,才会放到集群上使用Standalone或者YARN模式来运行。
在这种模式下,local有几种写法:
- local : 开启一个线程,相当于local[1]。
- local[N] : 自己设定线程的数量为N个,例如local[2]就表示使用2个线程。
- local[*] : 按照CPU的核数来设置线程数量。
同时,如果要使用local模式的话,需要在本地安装好Spark的本地环境。其实就是将Spark的安装包在本地解压,然后配置环境变量即可。因为Spark的程序在执行的时候,其实是需要使用到spark-submit这个脚本的。
1.2.3.2. 集群模式
如果想要将自己的代码提交到集群上,以Standalone或者YARN模式来执行的话,只需要将代码中SparkSession在创建时候的.master去掉即可。以Standalone模式或者YARN模式来提交任务即可,参考Spark任务提交的部分。
在后续的课程中,默认都是以local模式来运行程序。因为在local模式下,可以快速的对程序进行测试,快速的定位问题、解决问题。如果需要提交到集群运行,按照上述方式修改提交方式即可。
1.2.4. 数据加载
在我们使用到SparkSQL的时候,需要使用SparkSession作为程序的入口,同时以DataFrame作为我们的编程模型。其实就是使用DataFrame来抽象描述我们需要处理的数据。那么DataFrame是怎么创建出来的呢?
- 通过RDD对象进行创建。
- 通过读取数据文件进行创建。
1.2.4.1. 通过RDD创建DataFrame
RDD是SparkCore中的编程模型,虽然我们不使用SparkCore来进行编程开发,但是还是有必要了解一下如何通过RDD进行DataFrame的构建。通过RDD创建DataFrame有两种方式:
- <SparkSession对象>.createDataFrame()
- <RDD对象>.toDF()
# 1. 构建RDD
rdd = spark.sparkContext.parallelize([
("xiaoming", 19, 99, "male"),
("xiaohong", 20, 98, "female"),
("xiaogang", 20, 89, "male"),
("xiaojuan", 21, 88, "female")
])
# 2. 通过RDD构建DataFrame,构建方式:createDataFrame
'''
# 直接通过createDataFrame创建DataFrame
df = spark.createDataFrame(rdd)
'''
df = rdd.toDF()
df.printSchema()
df.show()
但是这样创建出来的DataFrame对象的Schema信息中,每一个字段的类型可以由值进行推导,但是字段的名字无法手动定义,只能使用默认的_0、_1等来描述,可读性非常差。因此我们就可以在创建DataFrame的时候,同时设定表头信息。
-
**使用数组的方式:**可以设置字段的名字,类型是由值来进行推导。
# 通过RDD构建DataFrame,通过数组,定义表头 df1 = spark.createDataFrame(rdd, schema=['name', 'age', 'score', 'gender']) df2 = rdd.toDF(schema=['name', 'age', 'score', 'gender']) """ root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- score: long (nullable = true) |-- gender: string (nullable = true) +--------+---+-----+------+ | name|age|score|gender| +--------+---+-----+------+ |xiaoming| 19| 99| male| |xiaohong| 20| 98|female| |xiaogang| 20| 89| male| |xiaojuan| 21| 88|female| +--------+---+-----+------+ """
-
**使用StructTypes:**可以设置字段的名字、类型等属性。
# StductTypes需要导入模块使用 from pyspark.sql.types import StructType, StringType, IntegerType # 实例化Schema信息 schema = StructType()\ .add('name', StringType(), True)\ .add('age', IntegerType(), True)\ .add('score', IntegerType(), True)\ .add('gender', StringType(), True) # 2. 通过RDD构建DataFrame,通过数组,定义表头 df1 = spark.createDataFrame(rdd, schema=schema) df2 = rdd.toDF(schema=schema)
1.2.4.2. 通过读取数据外部数据创建DataFrame
标准读取方式
# 加载数据的标准流程,是使用load函数来加载。此时默认加载的是parquet格式的数据。
df_parquet = spark.read.load("../../../data/users.parquet")
# 如果需要加载其他格式的数据,需要指定format格式。
df_json = spark.read.load("../../../data/account.json", format="json")
df_csv = spark.read.load("../../../data/country.csv", format="csv")
读取JSON数据
# 读取JSON格式的数据,读取JSON数据创建的DataFrame对象自带Schema信息
df_json = spark.read.json("../../../data/account.json")
读取CSV数据
# 读取CSV格式的数据,CSV在读取的时候,可以设置分隔符的格式和是否包含头部信息
# 分隔符:CSV文件中,默认的列分隔符是逗号,也可以替换成为其他的分隔符
# 头部信息:第一行的数据是否作为字段的Schema的名字,如果不包含,字段的名字默认为_c0, _c1...
df_csv = spark.read.csv("../../../data/ip-pprovince-count.csv", sep=";", header=True)
读取Parquet数据
# 读取Parquet格式的数据
df_parquet = spark.read.parquet("../../../data/users.parquet")
读取orc数据
# 读取orc格式的数据
df_orc = spark.read.orc("../../../data/student.orc")
读取txt数据
# 读取txt格式的数据,这种方式读取的数据,只能将一行的数据读成一列
df_txt = spark.read.text("../../../data/dailykey.txt")
读取数据库数据
# 读取数据库的数据
# 读取的时候需要使用JDBC来读取,因此需要将JDBC的jar包放到本地spark目录下的jars文件夹下
# 如果需要将代码提交到集群上运行,需要将JDBC的jar包放到集群的Spark目录下的jars文件夹下
df_jdbc = spark.read.jdbc(url="jdbc:mysql://localhost:3306/pydb",
table="bank_account",
properties={
"user": "root",
"password": "123456",
"driver": "com.mysql.cj.jdbc.Driver"
})
1.2.5. DSL风格处理数据
函数 | 函数描述 |
---|---|
printSchema() | 打印DataFrame的Schema信息 |
show() | 打印参数数量指定的行数据,默认20行 |
filter() | 对DataFrame中的数据进行过滤,保留满足条件的数据,等同于where() |
where() | 对DataFrame中的数据进行过滤,保留满足条件的数据,等同于filter() |
select() | 查询DataFrame中指定的列 |
groupBy() | 对DataFrame的数据,按照指定的字段进行分组 |
dropDuplicates() | 对DataFrame的数据进行去重,保留重复的第一条数据 |
dropna() | 删除有缺失值的行,删除逻辑可以设置 |
fillna() | 填充缺失值数据 |
1.2.5.1. printSchema
DataFrame将结构化的数据抽象成为一张表,使用printSchema打印这个虚拟的表的结构。
可以查看到“字段”的名字和类型。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 打印Schema
df_json.printSchema()
1.2.5.2. show
打印DataFrame描述的若干行的数据,默认打印20行。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 默认打印20行的数据
df_json.show()
# 打印5行数据
df_json.show(5)
1.2.5.3. filter
对DataFrame的数据按照条件进行过滤,保留满足条件的数据,删除不满足条件的数据。得到一个新的DataFrame。
filter的参数可以是一个字符串的判断条件,也可以是Column对象的比较。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 使用字符串描述规则
df_json.filter("age > 18").show()
# Column:是一个用来描述DataFrame中的"列、字段"的类,使用df_json['字段名']来获取
df_json.filter(df_json['age'] > 19).show()
1.2.5.4. where
等同于filter。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 使用字符串描述规则
df_json.where("age > 18").show()
# Column:是一个用来描述DataFrame中的"列、字段"的类,使用df_json['字段名']来获取
df_json.where(df_json['age'] > 19).show()
1.2.5.5. select
查询DataFrame中的指定的字段。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# select的参数可以是字符串,也可以是Column对象
# 使用字符串参数
df_json.select('name', 'age').show()
# 使用Column对象作为参数
df_json.select(df_json['name'], df_json['age']).show()
# 如果需要对查询的字段进行操作,例如运算、别名等,必须要使用Column对象
df_json.select((df_json['age'] + 10).alias('new_age')).show()
1.2.5.6. groupBy
对DataFrame的数据,按照指定的字段进行分组,返回值是一个GroupedData。
GroupedData对象是一个特殊的DataFrame,是对DataFrame的数据进行分组之后的结果,其中记录了DataFrame中的数据分组之后形成的数据。对GroupedData对象,我们可以使用一些聚合函数来操作,例如max、min、count、sum、avg等。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 按照省份进行分组
df = df_json.groupby('province')
# 每个省份的最大年龄
df.max('age').show()
# 每个省份的最小身高
df.min('height').show()
# 每个省份有多少人
df.count().show()
# 每个省份的总年龄
df.sum('age').show()
# 每个省份的平均身高
df.avg('height').show()
1.2.5.7. dropDuplicates
对DataFrame的数据进行去重,保留重复的第一条数据。
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 去重,保留重复的第一条数据(每一列的值都得相同)
df_json.dropDuplicates().show()
# 指定重复判断的字段,如果age字段值相同,视为两行数据为重复数据
df_json.dropDuplicates(subset=['age']).show()
1.2.5.8. dropna
删除有缺失值的行,删除逻辑可以设置
# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 删除有缺失数据的行,默认规则:只要一行数据中有至少一列是null,就删除这一行
# df_json.dropna().show()
# 删除有效字段不到两列的数据(至少需要有两列不是null,否则就删除)
# df_json.dropna(thresh=2).show()
# 删除指定字段是null的行
df_json.dropna(subset=['name']).show()
1.2.5.9. fillna
填充缺失值数据
# 填充所有的null为 N/A
# 这里需要注意数据类型,在df_json中,age和height列分别为long和double类型,因此这个填充无法对这两个列生效
df_json.fillna("N/A").show()
# 填充指定列的null
df_json.fillna(0, subset=['age', 'height']).show()
# 使用指定规则进行填充
df_json.fillna({'age': 0, 'height': 0.0, 'name': '佚名', 'province': '未知省份'}).show()
1.2.6. SQL风格处理数据
在使用Spark SQL的时候,出了上述的DSL风格的方式之外,还有一种更为简单,也是在开发中使用的非常多的方式:SQL。
在SparkSQL部分,使用到的编程模型是DataFrame,相比较于RDD来说,DataFrame多出来了一个Schema信息,将结构化的数据抽象称为一张二维表格来处理。SparkSQL对这样的数据在处理的时候,可以直接使用SQL语句来做。
与Hive类似,SparkSQL可以将SQL语句解析成为SparkCore部分的RDD的处理逻辑,提交到Spark集群运行。因此我们在进行数据的处理的时候,可以直接使用SQL语句来完成。这里的SQL支持标准SQL语句、Hive SQL等。
# 在使用SQL风格的时候,首先要做的事情是注册一张虚拟表的名字。
# 在将DataFrame注册虚拟表的时候,有三个函数可以使用
# createTempView :注册一张虚拟表,如果这个表存在则报错。
# createOrReplaceTempView :注册一张虚拟表,如果这个表存在,则覆盖之前的表名。
# createGlobalTempView :注册一张全局的虚拟表。
# 前两个函数注册表,可以称为"临时表、局部表",这种表只能在当前的SparkSession会话环境中使用。
# 最后一个函数注册的表,可以称为"全局表",这种表可以跨SparkSession会话使用。
df_json.createOrReplaceTempView('people')
# 注册完成之后,就可以使用这个表名,使用SQL语句来处理了
# 案例1:统计成年人的数量
spark.sql("select count(*) from people where age >= 18").show()
# 案例2:统计每一个省份有多少人
spark.sql("select province, count(1) from people group by province having province != 'null'").show()
1.2.7. 数据落地
在上述的操作中,我们最终都只是将处理之后的数据打印到了控制台上。但是在实际的使用场景下,有时候我们需要将数据导出,形成一个数据文件。那么此时我们只需要修改一下最终要用到的程序的落地的函数即可。
1.2.7.1. 标准落地方式
# 标准落地数据的格式,默认是parquet格式的数据
df_json.write.save('./output/standard')
# 在向外写文件的时候,可以设置mode,参数为string类型
# append: 追加,如果指定的路径下有数据文件存在,本次的输出会追加到之前的数据文件的末尾
# overwrite: 覆盖,如果指定的路径下有数据文件,本次的输出会将之前的数据覆盖掉
# ignore: 忽略,如果指定的路径下有数据文件,本次的输出会被忽略掉,什么都都不做
# error: 异常,如果指定的路径下有数据文件,本次的输出会异常
df_json.write.mode("overwrite").save("./output/standard")
1.2.7.2. 落地JSON数据
# 将数据落地称为JSON格式的数据
df_json.write.json("./output/json")
1.2.7.3. 落地CSV数据
# 将数据落地成为CSV格式的数据
# header: 将DataFrame中的字段名序列到csv文件的首行
# sep: 指定字段之间的分隔符
df_json.write.csv("./output/csv", header=True, sep='|')
1.2.7.4. 落地Parquet数据
# 可以直接使用标准落地方式
df_json.write.save('./output/standard')
# 也可以直接使用parquet格式的数据
df_json.write.parquet('./output/parquet')
1.2.7.5. 落地orc数据
# 将数据落地成为orc格式的数据
df_json.write.orc('./output/orc')
1.3.7.6. 落地txt数据
# 将数据落地成为txt格式的数据
df_json.write.text("./output/txt")
1.3.7.7. 落地数据库数据
# 将DataFrame的数据通过JDBC写入到数据库中
# 写数据的时候的时候需要使用JDBC来完成,因此需要将JDBC的jar包放到本地spark目录下的jars文件夹下
# 如果需要将代码提交到集群上运行,需要将JDBC的jar包放到集群的Spark目录下的jars文件夹下
df_json.write.jdbc(url="jdbc:mysql://localhost:3306/pydb",
table="people",
properties={
"user": "root",
"password": "123456",
"driver": "com.mysql.cj.jdbc.Driver"
})