当前位置: 首页 > article >正文

Spark 的 Skew Join 详解

    Skew Join 是 Spark 中为了解决数据倾斜问题而设计的一种优化机制。数据倾斜是指在分布式计算中,由于某些 key 具有大量数据,而其他 key 数据较少,导致某些分区的数据量特别大,造成计算负载不均衡。数据倾斜会导致个别节点出现性能瓶颈,影响整个任务的完成时间。

    Skew Join 的优化机制在 Spark 中主要解决了 JOIN 操作中的数据倾斜问题。为了更好地理解 Skew Join 的原理和实现,我们需要从数据倾斜产生的原因、Spark 如何识别数据倾斜、以及 Skew Join 的优化策略和底层实现等方面来进行详细解释。

一、什么是数据倾斜

        数据倾斜指的是当某些 key 关联了异常大量的数据,而其他 key 关联的数据量较少时,数据分布的不均衡会导致计算瓶颈。例如,在 JOIN 操作中,如果表 A 中某个 key 具有大量的数据,而表 B 中同样的 key 也有大量数据,当这两个表基于这个 key 进行 JOIN 时,由于该 key 被分配到一个或少数几个分区,相关的任务会处理大量的数据,而其他分区的任务数据量却较少。这会导致部分任务比其他任务运行时间长,从而影响整个任务的执行时间。

二、Spark 中如何识别数据倾斜

        在执行 JOIN 操作时,Spark 会通过数据采样和统计信息来检测是否存在数据倾斜。Spark SQL 可以通过分析数据分布,计算每个 key 的数据量,当发现某些 key 占据了大量的行时,Spark 会将其标记为 "倾斜的 key"。对于这些倾斜的 key,Spark 会进行特殊处理,避免过度集中在某些分区中。

Spark 的 Skew Join 优化主要依赖于配置参数和数据采样来检测并处理这些倾斜的 key

检测数据倾斜的主要参数:
  • spark.sql.autoSkewJoin.enabled: 默认是 false,如果设置为 true,Spark 会自动检测和处理数据倾斜的 JOIN 操作。
  • spark.sql.skewJoin.threshold: 用来设定 Spark 如何判断某个分区是否倾斜。该参数设置的值是数据倾斜的阈值,通常是一个比例值,如果某个分区的数据量超过该比例值,则会被视为倾斜的分区。

三、Skew Join 的底层原理

        当 Spark 识别出 JOIN 中存在数据倾斜时,Skew Join 会将倾斜的 key 拆分成多个子任务分别处理。具体而言,Skew Join 的主要思想是将倾斜的 key 拆分到多个不同的分区,从而将任务的计算负载均匀分布,避免单个分区处理过多数据。

以下是 Skew Join 的执行流程:

  1. 普通的非倾斜 key 处理

    对于普通的非倾斜 keySkew Join 没有特别的处理方式,Spark 直接按照 key 进行 Shuffle,将数据发送到相应的分区,并进行 JOIN 操作。
  2. 倾斜的 key 处理

        对于检测到的倾斜 key,Spark 会进行特殊处理,具体步骤如下:

  • Spark 会将倾斜的 key 的数据进行重新分片,将大数据量的倾斜 key 拆分成多个子分区。
  • 然后对于每一个子分区,分别与另一个表中的对应数据进行 JOIN
  • 通过多次 JOIN 操作,将这些子分区结果合并为最终的 JOIN 输出结果。

     3. Hash Salt(哈希加盐)

        为了避免倾斜的 key 被集中到同一个分区,Spark 会通过对倾斜的 key 添加一个随机的 salt(盐值)来打散数据。具体来说,Spark 会将倾斜的 key 拆分成多个子 key,通过附加随机数(salt),使得这些子 key 被分布到不同的分区。

伪代码展示:
// 倾斜 key 的原始 join
tableA.join(tableB, "key")

// Skew Join 处理
val skewKeys = getSkewKeys()
for (skewKey <- skewKeys) {
  val saltedTableA = tableA.filter($"key" === skewKey).withColumn("salt", rand())
  val saltedTableB = tableB.filter($"key" === skewKey).withColumn("salt", rand())
  saltedTableA.join(saltedTableB, Seq("key", "salt"))
}

通过引入 salt,可以有效地将数据均匀分布到不同的分区,减少单个分区处理的数据量。

四、Skew Join 的源代码实现

        在 Spark SQL 中,Skew Join 是作为 PhysicalPlan 中 Join 的一个优化执行计划。关键类为 EnsureRequirements,其主要职责是对 Join 的物理计划执行前进行必要的调整,包括处理数据倾斜的 Skew Join 优化。

以下是 EnsureRequirements 中处理数据倾斜的相关部分源代码:

private def applySkewJoin(plan: SparkPlan): SparkPlan = plan match {
  case join @ ShuffledHashJoinExec(_, _, _, _, left, right) =>
    // 检查是否有数据倾斜
    if (isSkewed(join)) {
      // 处理 skew join,使用 hash salt 拆分倾斜的 key
      val skewJoin = handleSkewJoin(join)
      skewJoin
    } else {
      join
    }
  case other => other
}

        在 EnsureRequirements 中,applySkewJoin 函数会检测当前的 JOIN 是否存在数据倾斜问题。如果检测到数据倾斜,handleSkewJoin 函数会对数据进行处理,创建一个带有 salt 的 Skew Join 执行计划。

具体实现步骤:
  1. 检测数据倾斜isSkewed(join) 函数负责检测 JOIN 中的分区是否有数据倾斜。通常,通过采样和统计每个分区的数据量,来判断某个分区的数据量是否超出设定的阈值(spark.sql.skewJoin.threshold)。

  2. 处理倾斜数据handleSkewJoin(join) 函数是 Skew Join 的核心实现。它会通过对倾斜的 key 添加 salt 进行打散,使得数据均匀分布到多个子分区。

private def handleSkewJoin(join: ShuffledHashJoinExec): SparkPlan = {
  val skewKeys = getSkewKeys(join)
  val saltedLeft = splitAndSalt(join.left, skewKeys)
  val saltedRight = splitAndSalt(join.right, skewKeys)
  saltedLeft.join(saltedRight)
}

private def splitAndSalt(plan: SparkPlan, skewKeys: Seq[KeyType]): SparkPlan = {
  // 对每个倾斜 key 进行拆分并添加 salt
  plan.transform {
    case rdd: RDD[_] => 
      rdd.mapPartitionsInternal { iter =>
        iter.flatMap { row =>
          val key = getJoinKey(row)
          if (skewKeys.contains(key)) {
            val salt = Random.nextInt(numSplits) // 随机生成 salt
            Some((key, salt, row))
          } else {
            Some((key, row))
          }
        }
      }
  }
}

        在上面的代码中,splitAndSalt 函数将每个倾斜的 key 拆分成多个子 key,并为它们添加随机 salt,从而打散数据,均匀分布到不同的分区。

五、Skew Join 的优化策略

Spark 中 Skew Join 的优化需要考虑以下几个方面:

  1. 自动启用 Skew Join:通过设置 spark.sql.autoSkewJoin.enabled 为 true,Spark 会自动检测并处理倾斜的 JOIN 操作。对于那些倾斜的分区,Spark 会自动进行 Skew Join 优化。

  2. 调优 salt 值salt 的值影响了倾斜数据被打散的粒度。通过调节 salt 的随机范围,可以控制数据的打散程度。如果 salt 的范围太小,数据可能仍然集中在某些分区;如果范围太大,则可能会产生过多的小分区,导致计算开销增加。

  3. 采样优化:通过调整采样参数,Spark 可以更好地识别出数据倾斜的 key,从而提高 Skew Join 的处理效率。spark.sql.skewJoin.threshold 参数允许用户设定数据倾斜的阈值。

  4. 数据预处理:在某些场景中,用户可以通过在数据加载和预处理阶段手动解决数据倾斜问题。例如,用户可以通过聚合或者过滤数据的方式,减少倾斜 key 的数据量。

六、总结

    Skew Join 是 Spark 中为了解决数据倾斜问题而提供的一种重要优化机制。其核心思想是通过检测数据倾斜的 key,并对这些 key 进行分片和哈希加盐处理,使得倾斜的数据被均匀分布到不同的分区,从而避免计算负载的不均衡。通过 Skew Join,Spark 可以显著提高 JOIN 操作的性能,尤其是在数据倾斜严重的场景下。

合理的参数调优和数据预处理是确保 Skew Join 有效的关键。


http://www.kler.cn/news/324329.html

相关文章:

  • Spring Boot 2.4.3 + Java 8 升级为 Java 21 + Spring Boot 3.2.0
  • ubuntu 不用每次输入sudo的四种方式
  • 基于python+django+vue的电影数据分析及可视化系统
  • 滚雪球学MySQL[6.1讲]:数据备份与恢复
  • 初始MYSQL数据库(6)—— 事务
  • 什么东西可以当做GC Root,跨代引用如何处理?
  • 【LLM】从零预训练一个tiny-llama
  • python高级用法_装饰器
  • text2sql方法:NatSQL和DIN-SQL
  • 【Redis 源码】4adlist列表.md
  • 3. 轴指令(omron 机器自动化控制器)——>MC_MoveVelocity
  • 生物信息常用编辑器:轻量/强大/可定制/跨平台支持的编辑器之神 - vim
  • 前端开发设计模式——单例模式
  • golang 如何生成唯一的 UUID
  • jQuery——属性
  • Webpack教程-概述
  • CF补题第二天
  • 【C++篇】迈入新世界的大门——初识C++(上篇)
  • element下拉框联动 或 多选 回显数据后页面操作不生效问题解决
  • 汇编语言 访问CMOS RAM并打印时间(未完)
  • 6-演员和蓝图
  • 计算机毕业设计 基于Python的热门微博数据可视化分析系统的设计与实现 Python+Django+Vue 可视化大屏 附源码 讲解 文档
  • MySQL—触发器详解
  • vector的模拟实现以及oj题(2)
  • Linux —— Socket编程(二)
  • NetworkPolicy访问控制
  • Windows 开发工具使用技巧
  • PAT甲级1003Emergency
  • 【分布式微服务云原生】10分钟揭秘Dubbo负载均衡:如何让服务调用更智能?
  • 发明专利实用新型专利外观设计专利