当前位置: 首页 > article >正文

spark.sql

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, rank, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 初始化 SparkSession 对象
spark = SparkSession.builder \
    .appName("Example PySpark Script with TempView and SQL") \
    .getOrCreate()

# 定义数据结构
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# 创建第一个 DataFrame
data1 = [
    ("Alice", 34, "New York"),
    ("Bob", 45, "Los Angeles"),
    ("Cathy", 29, "San Francisco"),
    ("David", 32, "Chicago"),
    ("Eve", 27, "Seattle")
]
df1 = spark.createDataFrame(data=data1, schema=schema)

# 创建第二个 DataFrame
data2 = [
    ("Frank", 30, "New York"),
    ("Grace", 38, "Los Angeles"),
    ("Hannah", 25, "San Francisco"),
    ("Ian", 42, "Chicago"),
    ("Jack", 28, "Seattle")
]
df2 = spark.createDataFrame(data=data2, schema=schema)

# 查看 DataFrame 结构
df1.printSchema()
df2.printSchema()

# 使用 filter 过滤年龄大于等于 30 的记录
filtered_df1 = df1.filter(col("age") >= 30)
filtered_df2 = df2.filter(col("age") >= 30)

# 使用 group by 计算每个城市的平均年龄
grouped_df1 = filtered_df1.groupBy("city").agg(
    count("name").alias("count"),
    mean("age").alias("avg_age")
)

grouped_df2 = filtered_df2.groupBy("city").agg(
    count("name").alias("count"),
    mean("age").alias("avg_age")
)

# 合并两个 DataFrame
merged_df = grouped_df1.union(grouped_df2)

# 从合并后的 DataFrame 中随机抽取 50% 的样本
sampled_df = merged_df.sample(withReplacement=False, fraction=0.5)

# 限制结果集的大小为 10 条记录
limited_df = sampled_df.limit(10)

# 使用窗口函数进行排名
window_spec = Window.partitionBy("city").orderBy(desc("avg_age"))
ranked_df = limited_df.withColumn("rank", rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))

# 将 DataFrame 注册为临时视图
ranked_df.createOrReplaceTempView("ranked_cities")

# 使用 SQL 查询
sql_query = """
SELECT city, count, avg_age, rank, row_number
FROM ranked_cities
WHERE rank <= 2
"""

# 执行 SQL 查询
sql_results = spark.sql(sql_query)

# 显示结果
sql_results.show(truncate=False)

# 关闭 SparkSession
spark.stop()

在 PySpark 中,createOrReplaceTempView 方法可以将 DataFrame 注册为临时视图(temporary view),这样就可以使用 SQL 查询来操作 DataFrame。临时视图只在当前 SparkSession 的生命周期内有效,并且在同一 SparkSession 中可以被多次替换。

我们可以在之前的示例中加入 createOrReplaceTempView,以便使用 SQL 查询来完成一些操作。

代码解释

  1. 创建 DataFrame:定义数据结构,并创建两个 DataFrame。
  2. 使用 filter:过滤符合条件的记录。
  3. 使用 group by:按字段进行分组聚合。
  4. 使用 union:将两个 DataFrame 合并。
  5. 使用 sample:从 DataFrame 中随机抽取样本。
  6. 使用 limit:限制结果集的大小。
  7. 使用窗口函数:添加窗口函数来执行复杂的分析。
  8. 使用 createOrReplaceTempView:注册临时视图。
  9. 使用 SQL 查询:执行 SQL 查询。 

在 PySpark 中,执行 SQL 查询可能会比直接使用 DataFrame API 慢一些,原因在于以下几个方面:

  1. SQL 解析和优化:当使用 SQL 查询时,PySpark 需要解析 SQL 语句,将其转换成逻辑计划,然后进行优化,最终生成物理执行计划。这个过程可能需要一些时间,尤其是在复杂的查询中。

  2. Shuffle 操作:如果 SQL 查询涉及 shuffle 操作(例如 group by、join 等),那么数据需要重新分区和排序,这会导致额外的计算开销和磁盘 I/O。在你的例子中,虽然没有涉及 shuffle 操作,但如果查询复杂度增加,shuffle 可能成为瓶颈。

  3. 数据序列化和反序列化:在执行 SQL 查询时,数据可能需要多次序列化和反序列化,这也会影响性能。

  4. 执行计划缓存:对于重复执行的查询,执行计划可以被缓存,从而加速后续执行。但是,对于一次性查询,这种缓存带来的好处有限。

  5. 数据量:如果数据量很大,即使是简单的筛选操作也可能花费一定的时间。

优化建议

为了提高 SQL 查询的性能,可以考虑以下几个优化策略:

  1. 减少 Shuffle:尽量减少涉及 shuffle 的操作,例如使用广播 join 而不是普通的 join。

  2. 缓存 DataFrame:如果你反复使用同一个 DataFrame,可以将其缓存(persist 或 cache)以减少重复计算。

  3. 使用 DataFrame API:尽可能使用 DataFrame API 替代 SQL 查询,因为 DataFrame API 通常更高效。

  4. 索引:虽然 PySpark 本身没有索引的概念,但可以通过预处理数据来减少查询时的数据扫描范围。

  5. 调整配置:调整 Spark 的配置参数,例如增加内存分配、调整 shuffle 的参数等。


http://www.kler.cn/a/301110.html

相关文章:

  • 图像处理学习笔记-20241118
  • Nuxt3 动态路由URL不更改的前提下参数更新,NuxtLink不刷新不跳转,生命周期无响应解决方案
  • 湘潭大学软件工程算法设计与分析考试复习笔记(三)
  • 量化交易系统开发-实时行情自动化交易-4.1.3.A股平均趋向指数(ADX)实现
  • 软件测试基础三十 (Python + Flask实现Mock平台搭建)
  • java语言特性(持续更新)
  • FaskAPI Web学习
  • 动态规划算法之斐波那契数列详细解读(附带Java代码解读)
  • 陈坤2024行走的力量 走向山野感受距离自然更近的地方
  • 9月→2024年计算机与信息安全国际会议
  • 如何读.Net Framework 的源码?
  • 观众登记2025中国(深圳)国际智能手机供应链展览会
  • 数据分析与挖掘课程相关资源
  • 使用 Python 读取 Excel 数据的详细教程
  • 【C++】关键字、命名空间、输入和输出、缺省参数的深入了解
  • Flutter 使用第三方包加载3d模型
  • SpringTest框架JUnit单元测试用例获取ApplicationContext实例的方法
  • 【数据结构-一维差分】力扣1854. 人口最多的年份
  • 陪玩小程序源码搭建,基于PHP+MySQL陪玩系统app源码
  • 解码未来:H.265与H.266技术对比及EasyCVR视频汇聚平台编码技术优势
  • 工具篇之Apache Commons
  • LeetCode HOT100系列题解之数组中的第K个最大元素(7/100)
  • 【Python系列】理解 Python 中的时间和日期处理
  • 汽车智能座舱展︱2025 广州国际汽车智能座舱及车载显示技术展览会
  • python绘制3D瀑布图
  • springboot体会BIO(阻塞式IO)