当前位置: 首页 > article >正文

SparkSQL编程实践

文章目录

    • SparkSQL编程实践
      • 1.1. 编程模型介绍
      • 1.2. SparkSQL编程
        • 1.2.1. 第三方库安装
        • 1.2.2. SparkSQL程序的结构
        • 1.2.3. SparkSQL执行模式
          • 1.2.3.1. Local模式
          • 1.2.3.2. 集群模式
        • 1.2.4. 数据加载
          • 1.2.4.1. 通过RDD创建DataFrame
          • 1.2.4.2. 通过读取数据外部数据创建DataFrame
            • 标准读取方式
            • 读取JSON数据
            • 读取CSV数据
            • 读取Parquet数据
            • 读取orc数据
            • 读取txt数据
            • 读取数据库数据
        • 1.2.5. DSL风格处理数据
          • 1.2.5.1. printSchema
          • 1.2.5.2. show
          • 1.2.5.3. filter
          • 1.2.5.4. where
          • 1.2.5.5. select
          • 1.2.5.6. groupBy
          • 1.2.5.7. dropDuplicates
          • 1.2.5.8. dropna
          • 1.2.5.9. fillna
        • 1.2.6. SQL风格处理数据
        • 1.2.7. 数据落地
          • 1.2.7.1. 标准落地方式
          • 1.2.7.2. 落地JSON数据
          • 1.2.7.3. 落地CSV数据
          • 1.2.7.4. 落地Parquet数据
          • 1.2.7.5. 落地orc数据
          • 1.3.7.6. 落地txt数据
          • 1.3.7.7. 落地数据库数据

SparkSQL编程实践

1.1. 编程模型介绍

在SparkCore中使用到的编程模型是RDD。RDD是一种弹性分布式数据集,是对我们需要处理的数据进行的抽象。其中可以存储任意的数据,没有标准的数据结构。

image-20230207151820022

image-20230207151927965

而SparkSQL使用到的编程模型是DataFrame,是在RDD的基础上添加了Schema信息。所谓的Scheme信息指的是描述数据的信息,也可以认为是“元数据”,DataFrame的前身就是SchemaRDD。

假设RDD中的几行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

从上面两个表格可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法等,有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

image-20230207154935852

其实SparkSQL中的DataFrame是借鉴了Python的Pandas库中的DataFrame的思想而来的,都是将结构化的数据映射成为一张二维的表格对数据进行处理。只不过Pandas中的DataFrame只能够处理本地的一些数据文件,而SparkSQL中的DataFrame却是能够处理分布式数据集合的编程模型。

1.2. SparkSQL编程

1.2.1. 第三方库安装

Spark本身是使用Scala语言来编写的,原生支持Scala、Java编程语言。而我们现在需要使用Python来进行操作,就需要下载安装第三方库pyspark,这个库是Spark官方发布的一个专门使用Python来操作Spark的库,我们直接使用pip就可以安装。

pip install pyspark==3.1.2

**注意:**在安装的时候,一定要与你的Spark的版本是一致的,否则会出现兼容性的问题。

1.2.2. SparkSQL程序的结构

在SparkSQL的程序中,程序的入口是SparkSession对象。在上方的Spark任务提交的部分,提到过一个比较重要的类是SparkContext,而在SparkSession中,对SparkContext进行了封装。

# 导入依赖库
from pyspark.sql import SparkSession

# 创建程序入口SparkSession对象
#   .builder : 获取到SparkSessionBuilder对象,用于创建SparkSession对象
#   .master : 设置Master是谁,可以设置为local表示本地模式
#   .appName : Spark程序的名字
#   .getOrCreate : 根据前面设置的属性,用来创建一个SparkSession对象
spark = SparkSession.builder.master("local[*]").appName("spark-sql").getOrCreate()

# 使用SparkSQL对数据进行处理的过程

# 在程序结束的时候,释放SparkSession对象,释放资源
spark.stop()
1.2.3. SparkSQL执行模式
1.2.3.1. Local模式

将master设置为local,即可开启local模式,使用这种模式进行程序的开发。在local模式下,一般用来做程序的测试的用途。

对于分布式计算程序的开发,local模式其实是必不可少的。我们需要在开发的过程中,不断的使用测试数据来测试我们的业务逻辑是否正确,对程序进行及时的调整。此时一般会使用与真实的数据相同格式的、数据量比较少的测试数据,来测试程序的正确性。测试通过之后,才会放到集群上使用Standalone或者YARN模式来运行。

在这种模式下,local有几种写法:

  • local : 开启一个线程,相当于local[1]。
  • local[N] : 自己设定线程的数量为N个,例如local[2]就表示使用2个线程。
  • local[*] : 按照CPU的核数来设置线程数量。

同时,如果要使用local模式的话,需要在本地安装好Spark的本地环境。其实就是将Spark的安装包在本地解压,然后配置环境变量即可。因为Spark的程序在执行的时候,其实是需要使用到spark-submit这个脚本的。

1.2.3.2. 集群模式

如果想要将自己的代码提交到集群上,以Standalone或者YARN模式来执行的话,只需要将代码中SparkSession在创建时候的.master去掉即可。以Standalone模式或者YARN模式来提交任务即可,参考Spark任务提交的部分。

在后续的课程中,默认都是以local模式来运行程序。因为在local模式下,可以快速的对程序进行测试,快速的定位问题、解决问题。如果需要提交到集群运行,按照上述方式修改提交方式即可。

1.2.4. 数据加载

在我们使用到SparkSQL的时候,需要使用SparkSession作为程序的入口,同时以DataFrame作为我们的编程模型。其实就是使用DataFrame来抽象描述我们需要处理的数据。那么DataFrame是怎么创建出来的呢?

  • 通过RDD对象进行创建。
  • 通过读取数据文件进行创建。
1.2.4.1. 通过RDD创建DataFrame

RDD是SparkCore中的编程模型,虽然我们不使用SparkCore来进行编程开发,但是还是有必要了解一下如何通过RDD进行DataFrame的构建。通过RDD创建DataFrame有两种方式:

  • <SparkSession对象>.createDataFrame()
  • <RDD对象>.toDF()
# 1. 构建RDD
rdd = spark.sparkContext.parallelize([
    ("xiaoming", 19, 99, "male"),
    ("xiaohong", 20, 98, "female"),
    ("xiaogang", 20, 89, "male"),
    ("xiaojuan", 21, 88, "female")
])

# 2. 通过RDD构建DataFrame,构建方式:createDataFrame
'''
# 直接通过createDataFrame创建DataFrame
df = spark.createDataFrame(rdd)
'''

df = rdd.toDF()

df.printSchema()
df.show()

但是这样创建出来的DataFrame对象的Schema信息中,每一个字段的类型可以由值进行推导,但是字段的名字无法手动定义,只能使用默认的_0、_1等来描述,可读性非常差。因此我们就可以在创建DataFrame的时候,同时设定表头信息。

  • **使用数组的方式:**可以设置字段的名字,类型是由值来进行推导。

    # 通过RDD构建DataFrame,通过数组,定义表头
    df1 = spark.createDataFrame(rdd, schema=['name', 'age', 'score', 'gender'])
    df2 = rdd.toDF(schema=['name', 'age', 'score', 'gender'])
    
    """
    root
     |-- name: string (nullable = true)
     |-- age: long (nullable = true)
     |-- score: long (nullable = true)
     |-- gender: string (nullable = true)
    
    +--------+---+-----+------+
    |    name|age|score|gender|
    +--------+---+-----+------+
    |xiaoming| 19|   99|  male|
    |xiaohong| 20|   98|female|
    |xiaogang| 20|   89|  male|
    |xiaojuan| 21|   88|female|
    +--------+---+-----+------+
    """
    
  • **使用StructTypes:**可以设置字段的名字、类型等属性。

    # StductTypes需要导入模块使用
    from pyspark.sql.types import StructType, StringType, IntegerType
    
    # 实例化Schema信息
    schema = StructType()\
        .add('name', StringType(), True)\
        .add('age', IntegerType(), True)\
        .add('score', IntegerType(), True)\
        .add('gender', StringType(), True)
    
    # 2. 通过RDD构建DataFrame,通过数组,定义表头
    df1 = spark.createDataFrame(rdd, schema=schema)
    df2 = rdd.toDF(schema=schema)
    
1.2.4.2. 通过读取数据外部数据创建DataFrame
标准读取方式
# 加载数据的标准流程,是使用load函数来加载。此时默认加载的是parquet格式的数据。
df_parquet = spark.read.load("../../../data/users.parquet")
# 如果需要加载其他格式的数据,需要指定format格式。
df_json = spark.read.load("../../../data/account.json", format="json")
df_csv = spark.read.load("../../../data/country.csv", format="csv")
读取JSON数据
# 读取JSON格式的数据,读取JSON数据创建的DataFrame对象自带Schema信息
df_json = spark.read.json("../../../data/account.json")
读取CSV数据
# 读取CSV格式的数据,CSV在读取的时候,可以设置分隔符的格式和是否包含头部信息
# 分隔符:CSV文件中,默认的列分隔符是逗号,也可以替换成为其他的分隔符
# 头部信息:第一行的数据是否作为字段的Schema的名字,如果不包含,字段的名字默认为_c0, _c1...
df_csv = spark.read.csv("../../../data/ip-pprovince-count.csv", sep=";", header=True)
读取Parquet数据
# 读取Parquet格式的数据
df_parquet = spark.read.parquet("../../../data/users.parquet")
读取orc数据
# 读取orc格式的数据
df_orc = spark.read.orc("../../../data/student.orc")
读取txt数据
# 读取txt格式的数据,这种方式读取的数据,只能将一行的数据读成一列
df_txt = spark.read.text("../../../data/dailykey.txt")
读取数据库数据
# 读取数据库的数据
# 读取的时候需要使用JDBC来读取,因此需要将JDBC的jar包放到本地spark目录下的jars文件夹下
# 如果需要将代码提交到集群上运行,需要将JDBC的jar包放到集群的Spark目录下的jars文件夹下
df_jdbc = spark.read.jdbc(url="jdbc:mysql://localhost:3306/pydb",
                        table="bank_account",
                        properties={
                            "user": "root",
                            "password": "123456",
                            "driver": "com.mysql.cj.jdbc.Driver"
                        })
1.2.5. DSL风格处理数据
函数函数描述
printSchema()打印DataFrame的Schema信息
show()打印参数数量指定的行数据,默认20行
filter()对DataFrame中的数据进行过滤,保留满足条件的数据,等同于where()
where()对DataFrame中的数据进行过滤,保留满足条件的数据,等同于filter()
select()查询DataFrame中指定的列
groupBy()对DataFrame的数据,按照指定的字段进行分组
dropDuplicates()对DataFrame的数据进行去重,保留重复的第一条数据
dropna()删除有缺失值的行,删除逻辑可以设置
fillna()填充缺失值数据
1.2.5.1. printSchema

DataFrame将结构化的数据抽象成为一张表,使用printSchema打印这个虚拟的表的结构。

可以查看到“字段”的名字和类型。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 打印Schema
df_json.printSchema()
1.2.5.2. show

打印DataFrame描述的若干行的数据,默认打印20行。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 默认打印20行的数据
df_json.show()
# 打印5行数据
df_json.show(5)
1.2.5.3. filter

对DataFrame的数据按照条件进行过滤,保留满足条件的数据,删除不满足条件的数据。得到一个新的DataFrame。

filter的参数可以是一个字符串的判断条件,也可以是Column对象的比较。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")

# 使用字符串描述规则
df_json.filter("age > 18").show()
# Column:是一个用来描述DataFrame中的"列、字段"的类,使用df_json['字段名']来获取
df_json.filter(df_json['age'] > 19).show()
1.2.5.4. where

等同于filter。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")

# 使用字符串描述规则
df_json.where("age > 18").show()
# Column:是一个用来描述DataFrame中的"列、字段"的类,使用df_json['字段名']来获取
df_json.where(df_json['age'] > 19).show()
1.2.5.5. select

查询DataFrame中的指定的字段。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")

# select的参数可以是字符串,也可以是Column对象
# 使用字符串参数
df_json.select('name', 'age').show()

# 使用Column对象作为参数
df_json.select(df_json['name'], df_json['age']).show()

# 如果需要对查询的字段进行操作,例如运算、别名等,必须要使用Column对象
df_json.select((df_json['age'] + 10).alias('new_age')).show()
1.2.5.6. groupBy

对DataFrame的数据,按照指定的字段进行分组,返回值是一个GroupedData。

GroupedData对象是一个特殊的DataFrame,是对DataFrame的数据进行分组之后的结果,其中记录了DataFrame中的数据分组之后形成的数据。对GroupedData对象,我们可以使用一些聚合函数来操作,例如max、min、count、sum、avg等。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")
# 按照省份进行分组
df = df_json.groupby('province')
# 每个省份的最大年龄
df.max('age').show()
# 每个省份的最小身高
df.min('height').show()
# 每个省份有多少人
df.count().show()
# 每个省份的总年龄
df.sum('age').show()
# 每个省份的平均身高
df.avg('height').show()
1.2.5.7. dropDuplicates

对DataFrame的数据进行去重,保留重复的第一条数据。

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")

# 去重,保留重复的第一条数据(每一列的值都得相同)
df_json.dropDuplicates().show()
# 指定重复判断的字段,如果age字段值相同,视为两行数据为重复数据
df_json.dropDuplicates(subset=['age']).show()
1.2.5.8. dropna

删除有缺失值的行,删除逻辑可以设置

# 读取本地JSON文件,创建DataFrame
df_json = ssc.read.json("../../../data/people.json")

# 删除有缺失数据的行,默认规则:只要一行数据中有至少一列是null,就删除这一行
# df_json.dropna().show()

# 删除有效字段不到两列的数据(至少需要有两列不是null,否则就删除)
# df_json.dropna(thresh=2).show()

# 删除指定字段是null的行
df_json.dropna(subset=['name']).show()
1.2.5.9. fillna

填充缺失值数据

# 填充所有的null为 N/A
# 这里需要注意数据类型,在df_json中,age和height列分别为long和double类型,因此这个填充无法对这两个列生效
df_json.fillna("N/A").show()

# 填充指定列的null
df_json.fillna(0, subset=['age', 'height']).show()

# 使用指定规则进行填充
df_json.fillna({'age': 0, 'height': 0.0, 'name': '佚名', 'province': '未知省份'}).show()
1.2.6. SQL风格处理数据

在使用Spark SQL的时候,出了上述的DSL风格的方式之外,还有一种更为简单,也是在开发中使用的非常多的方式:SQL。

在SparkSQL部分,使用到的编程模型是DataFrame,相比较于RDD来说,DataFrame多出来了一个Schema信息,将结构化的数据抽象称为一张二维表格来处理。SparkSQL对这样的数据在处理的时候,可以直接使用SQL语句来做。

与Hive类似,SparkSQL可以将SQL语句解析成为SparkCore部分的RDD的处理逻辑,提交到Spark集群运行。因此我们在进行数据的处理的时候,可以直接使用SQL语句来完成。这里的SQL支持标准SQL语句、Hive SQL等。

# 在使用SQL风格的时候,首先要做的事情是注册一张虚拟表的名字。
# 在将DataFrame注册虚拟表的时候,有三个函数可以使用
# 	createTempView :注册一张虚拟表,如果这个表存在则报错。
# 	createOrReplaceTempView :注册一张虚拟表,如果这个表存在,则覆盖之前的表名。
#	createGlobalTempView :注册一张全局的虚拟表。
# 前两个函数注册表,可以称为"临时表、局部表",这种表只能在当前的SparkSession会话环境中使用。
# 最后一个函数注册的表,可以称为"全局表",这种表可以跨SparkSession会话使用。
df_json.createOrReplaceTempView('people')

# 注册完成之后,就可以使用这个表名,使用SQL语句来处理了
# 案例1:统计成年人的数量
spark.sql("select count(*) from people where age >= 18").show()
# 案例2:统计每一个省份有多少人
spark.sql("select province, count(1) from people group by province having province != 'null'").show()

1.2.7. 数据落地

在上述的操作中,我们最终都只是将处理之后的数据打印到了控制台上。但是在实际的使用场景下,有时候我们需要将数据导出,形成一个数据文件。那么此时我们只需要修改一下最终要用到的程序的落地的函数即可。

1.2.7.1. 标准落地方式
# 标准落地数据的格式,默认是parquet格式的数据
df_json.write.save('./output/standard')

# 在向外写文件的时候,可以设置mode,参数为string类型
#   append: 追加,如果指定的路径下有数据文件存在,本次的输出会追加到之前的数据文件的末尾
#   overwrite: 覆盖,如果指定的路径下有数据文件,本次的输出会将之前的数据覆盖掉
#   ignore: 忽略,如果指定的路径下有数据文件,本次的输出会被忽略掉,什么都都不做
#   error: 异常,如果指定的路径下有数据文件,本次的输出会异常
df_json.write.mode("overwrite").save("./output/standard")
1.2.7.2. 落地JSON数据
# 将数据落地称为JSON格式的数据
df_json.write.json("./output/json")
1.2.7.3. 落地CSV数据
# 将数据落地成为CSV格式的数据
# header: 将DataFrame中的字段名序列到csv文件的首行
# sep: 指定字段之间的分隔符
df_json.write.csv("./output/csv", header=True, sep='|')
1.2.7.4. 落地Parquet数据
# 可以直接使用标准落地方式
df_json.write.save('./output/standard')
# 也可以直接使用parquet格式的数据
df_json.write.parquet('./output/parquet')
1.2.7.5. 落地orc数据
# 将数据落地成为orc格式的数据
df_json.write.orc('./output/orc')
1.3.7.6. 落地txt数据
# 将数据落地成为txt格式的数据
df_json.write.text("./output/txt")
1.3.7.7. 落地数据库数据
# 将DataFrame的数据通过JDBC写入到数据库中
# 写数据的时候的时候需要使用JDBC来完成,因此需要将JDBC的jar包放到本地spark目录下的jars文件夹下
# 如果需要将代码提交到集群上运行,需要将JDBC的jar包放到集群的Spark目录下的jars文件夹下
df_json.write.jdbc(url="jdbc:mysql://localhost:3306/pydb",
                   table="people",
                   properties={
                       "user": "root",
                       "password": "123456",
                       "driver": "com.mysql.cj.jdbc.Driver"
                   })

http://www.kler.cn/a/429595.html

相关文章:

  • Java中的注解:如何自定义注解并实现功能
  • 【MySQL学习笔记】MySQL视图View
  • Vue3组件设计模式:高可复用性组件开发实战
  • 手撕代码: C++实现按位序列化和反序列化
  • R.swift库的详细用法
  • 欧拉路径算法
  • ollama-webui - Ollama的ChatGPT 风格的 Web 界面
  • 从零开始的使用SpringBoot和WebSocket打造实时共享文本应用
  • Rust 内置数据结构——BTreeMap应用教程
  • 【教学类-82-01】20241209涂色手表制作1.0(表盘、表带)
  • 基于STM32的手势电视机遥控器设计
  • 使用pyspark完成wordcount案例
  • Flutter 图片编辑板(二) 拖动位置和对齐线应用
  • 封闭式论文写作--全面掌握ChatGPT-4o的写作技能,掌握提示词使用技巧、文献检索与分析方法,帮助您选定研究方向,提炼学术论文题目
  • 软件漏洞印象
  • 网络安全 - Cross-site scripting
  • 刷leetcodehot100-7动态规划
  • 【RBF SBN READ】hadoop社区基于RBF的SBN READ请求流转
  • 产品经理的财会知识课:资产的减值测试
  • X推出新AI图像生成器Aurora:更接近真实的创作效果
  • Facebook与Web3的结合:去中心化社交的可能性
  • 【go】fmt包讲解与案例
  • C语言实例_27之删除字符串中指定字符
  • 出海服务器可以用国内云防护吗
  • React废弃componentWillMount和componentWillReceiveProps这两个生命周期方法
  • 【优选算法篇】:双指针算法--开启高效编码的两把“魔法指针”,练习题演练