当前位置: 首页 > article >正文

PySpark用sort-merge join解决数据倾斜的完整案例

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 初始化SparkSession
spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

# 加载数据,假设数据来自parquet文件
table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")

# 查看表的大小
print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

# 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
joined = table1.join(table2, table1["id"] == table2["id"], "inner")

# 先对连接键进行排序,为sort-merge join做准备

sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

# 使用sort-merge join进行连接
joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

# 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
joined.count()

# 停止SparkSession
spark.stop()

代码解释

初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。

spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

加载数据并查看表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来了解表的规模。

table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")

print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。

sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

执行sort-merge join:利用排序后的表,执行 sort-merge join 操作,这里选择的是内连接。

joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

触发Action并查看执行情况:调用 count() 方法触发一个Action,此时Spark会真正执行整个计算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查看任务执行计划,尤其是查看各个阶段的数据分布情况,确认数据倾斜是否得到解决。

joined.count()

停止SparkSession:任务完成后,关闭SparkSession释放资源。

spark.stop()

要在Spark WebUI中查看数据倾斜:

  • 在执行 joined.count() 后,迅速打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在执行的 join 相关阶段。查看每个任务的处理数据量,如果之前存在数据倾斜,经过 sort-merge join 处理后,各个任务处理的数据量应该相对均匀。

http://www.kler.cn/a/503404.html

相关文章:

  • 二级C语言 2025/1/14
  • unity打包sdk热更新笔记
  • Docker Desktop 构建java8基础镜像jdk安装配置失效解决
  • 【机器学习】主动学习-增加标签的操作方法-流式选择性采样(Stream-based selective sampling)
  • C++实现设计模式---原型模式 (Prototype)
  • B+树的原理及实现
  • B3DM转换成FBX
  • Pgsql存储占用分析
  • AR 在高校实验室安全教育中的应用
  • 基于PHP的校园兼职系统设计和开发
  • 【Vue】Vue组件--上
  • [读书日志]从零开始学习Chisel 第十三篇:Scala的隐式参数与隐式转换(敏捷硬件开发语言Chisel与数字系统设计)
  • OLED显示字符
  • 八 rk3568 android11 AP6256 蓝牙调试
  • 网络安全之sql注入
  • 渐变头像合成网站PHP源码
  • YOLOv11实战行人跌倒识别
  • 学习笔记-Kotlin
  • ExplaineR:集成K-means聚类算法的SHAP可解释性分析 | 可视化混淆矩阵、决策曲线、模型评估与各类SHAP图
  • 【机器学习】实战:天池工业蒸汽量项目(一)数据探索
  • 【案例81】NMC调用导致数据库的效率问题
  • 深度学习中的学习率调度器(scheduler)分析并作图查看各方法差异
  • 【CI/CD构建】关于不小心将springMVC注解写在service层
  • 利用Python爬虫按图搜索1688商品(拍立淘):开启智能购物新体验
  • RIP协议在简单网络架构的使用
  • 工具推荐:PDFgear——免费且强大的PDF编辑工具 v2.1.12