当前位置: 首页 > article >正文

spark3 sql优化:同一个表关联多次,优化方案

目录

  • 1.合并查询
  • 2.使用 JOIN 条件的过滤优化
  • 3.使用 Map-side Join 或 Broadcast Join
  • 4.使用 Partitioning 和 Bucketing
  • 5.利用 DataFrame API 进行优化
  • 假设 A 和 B 已经加载为 DataFrame
  • Perform left joins with specific conditions
  • 6.使用缓存或持久化
  • 7.避免笛卡尔积
  • 总结

1.合并查询

如果在 SQL 中的多个 JOIN 操作是针对同一个表,只是条件不同,可以考虑将条件合并成一个查询,从而减少对同一表的多次扫描。例如,将多个 LEFT JOIN 转换成一个 JOIN,使用 CASE 或 FILTER 直接处理不同的关联条件。

优化前:

SELECT 
    A.id, 
    A.col1, 
    A.col2, 
    B1.col3 AS B1_col3, 
    B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4

优化后:

SELECT 
    A.id, 
    A.col1, 
    A.col2, 
    MAX(CASE WHEN A.col1 = B.col4 THEN B.col3 END) AS B1_col3,
    MAX(CASE WHEN A.col2 = B.col4 THEN B.col3 END) AS B2_col3
FROM A
LEFT JOIN B ON A.id = B.id
GROUP BY A.id, A.col1, A.col2

2.使用 JOIN 条件的过滤优化

通过精简 JOIN 条件,尽量减少连接的行数。例如,如果 B 表中有索引列,可以直接根据索引列做筛选,而不依赖复杂的条件。

假设对 B 表进行的连接条件中,有部分条件可以通过过滤的方式提前应用,比如通过 WHERE 子句或者 JOIN 之前的 FILTER。

SELECT 
    A.id, 
    A.col1, 
    A.col2, 
    B1.col3 AS B1_col3, 
    B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4
WHERE B1.col3 IS NOT NULL OR B2.col3 IS NOT NULL

3.使用 Map-side Join 或 Broadcast Join

在 Spark SQL 中,当其中一个表(比如 A)较小且能完全加载到内存时,Spark 会自动选择广播连接,即将小表广播到所有工作节点进行连接计算,而不是进行全表扫描。

如果你知道某个表的规模较小(例如 A),可以手动启用广播连接,减少 shuffle 的开销。

SELECT /*+ BROADCAST(A) */
    A.id, 
    A.col1, 
    A.col2, 
    B1.col3 AS B1_col3, 
    B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4

在这里,通过 /*+ BROADCAST(A) */ 强制 Spark 将 A 表广播到各个执行节点,从而避免了对大表 B 进行多次 shuffle。

广播条件:
A 表要相对较小,可以完全加载到内存中。
B 表较大,且 A 表的行数远小于 B。

4.使用 Partitioning 和 Bucketing

在分布式环境下,通过合理的分区和分桶设计,可以减少 JOIN 时的 shuffle 开销。尤其是对于大表,可以考虑对 A 或 B 表做分区(PARTITION BY)或分桶(BUCKET BY)。
– 对 B 表进行分桶(根据 id 或其他相关字段)

CREATE TABLE B (
    id INT,
    col3 STRING,
    col4 STRING
)
USING parquet
CLUSTERED BY (id) INTO 10 BUCKETS;

通过将表按某个字段进行分桶,Spark 在进行连接时能够减少数据的移动和重新分配。

5.利用 DataFrame API 进行优化

如果 SQL 性能不够高,可以尝试将查询转为 DataFrame API 编写,Spark DataFrame API 可能在某些复杂的连接和查询场景下更加高效。

假设 A 和 B 已经加载为 DataFrame

from pyspark.sql import functions as F

Perform left joins with specific conditions

df_A = spark.table("A")
df_B = spark.table("B")

df_B1 = df_B.filter(df_B.col4.isNotNull()).select("id", "col3")
df_B2 = df_B.filter(df_B.col4.isNotNull()).select("id", "col3")

df_result = df_A.join(df_B1, (df_A.id == df_B1.id) & (df_A.col1 == df_B1.col4), "left") \
                .join(df_B2, (df_A.id == df_B2.id) & (df_A.col2 == df_B2.col4), "left") \
                .select(df_A.id, df_A.col1, df_A.col2, df_B1.col3.alias("B1_col3"), df_B2.col3.alias("B2_col3"))

df_result.show()

DataFrame API 可以对复杂的 JOIN 和条件执行更多优化,比如延迟执行和缓存策略。

6.使用缓存或持久化

如果你在多次查询中重复使用某些中间结果(例如对 B 表的过滤结果或计算结果),可以选择缓存或持久化某些 DataFrame。

df_B1_cached = df_B1.cache()
df_B2_cached = df_B2.cache()

df_result = df_A.join(df_B1_cached, (df_A.id == df_B1_cached.id) & (df_A.col1 == df_B1_cached.col4), "left") \
                .join(df_B2_cached, (df_A.id == df_B2_cached.id) & (df_A.col2 == df_B2_cached.col4), "left")

缓存对于反复使用的子查询可以减少重新计算的开销。

7.避免笛卡尔积

笛卡尔积会导致非常高的计算开销和内存占用,因此在 JOIN 时需要确保条件足够明确,避免无条件的多表连接。你可以使用 EXPLAIN 来分析查询计划,检查是否出现了笛卡尔积。
查询计划中的 CartesianProduct 或 CROSS JOIN

EXPLAIN SELECT A.id, A.col1, A.col2, B.col3 FROM A JOIN B ON A.id = B.id

Spark SQL / Hive 中,查询计划可能会显示 CartesianProduct 或类似的描述,指明两张表间进行了笛卡尔积连接。

== Physical Plan ==
CartesianProduct(0)

PostgreSQL、MySQL 等关系型数据库,通常会标明连接类型。如果执行计划中显示了 CROSS JOIN,则明确表示笛卡尔积。

->  Seq Scan on table_a  (cost=0.00..10.00 rows=100 width=20)
->  Seq Scan on table_b  (cost=0.00..10.00 rows=100 width=20)
->  Hash Join  (cost=200.00..220.00 rows=1000 width=100)

如果这里显示了 CROSS JOIN,就意味着没有任何连接条件,导致笛卡尔积的生成。

通过查看执行计划(EXPLAIN)了解是否存在不必要的全表扫描。

总结

合并查询: 用 CASE WHEN 合并多个 JOIN。
简化 JOIN 条件: 提前通过 WHERE 子句过滤无效数据。
广播连接: 对小表使用 BROADCAST,减少 shuffle 开销。
分区和分桶: 对大表进行分区或分桶优化 JOIN 性能。
使用 DataFrame API: 在某些复杂查询中,DataFrame API 性能更优。
缓存数据: 重复使用的数据可以进行缓存或持久化。
避免笛卡尔积: 确保 JOIN 有明确的条件,避免全表扫描。


http://www.kler.cn/a/427415.html

相关文章:

  • Web安全深度剖析
  • URL访问网址的全过程
  • [C#]利用opencvsharp 已知原图和mask掩码图像,抠出原图中人物,背景设置为透明色
  • 方案拆解 | 打击矩阵新规频出!2025矩阵营销该怎么玩?
  • 蓝桥杯2117砍竹子(简单易懂 包看包会版)
  • 常见限流算法介绍 和 Spring Cloud Sentinel使用方式
  • 企业级资源监控方案落地:Prometheus+Grafana+Export
  • 代码随想录-算法训练营day35(贪心算法05:无重叠区间,划分字母区间,合并区间)
  • oracle 数组分组
  • 电子应用设计方案-43:智能手机充电器系统方案设计
  • node.js常用的模块和中间件?
  • DAY168内网对抗-基石框架篇单域架构域内应用控制成员组成用户策略信息收集环境搭建
  • RBA评分等级和标准
  • Oracle系统性能监控工具oswatcher演示
  • 通过 FRP 实现 P2P 通信:控制端与被控制端配置指南
  • Spring Boot接口返回统一格式
  • stm32中的常用函数
  • Java基础-异常
  • 【Vue3】 vant4 + Uploader组件上传文件
  • C++设计模式(建造者、中介者、备忘录)