当前位置: 首页 > article >正文

Apache Spark_解决生产环境数据倾斜问题方案及思路

        问题背景

        在大数据处理中,数据倾斜是指在分布式计算中,某些节点处理的数据量远大于其他节点,导致计算资源的不均衡使用,从而影响整体的计算性能。本文将介绍如何使用Apache Spark(特别是Scala API)处理数据倾斜问题。

        我司在处理超大数据量级别数据时,经常出现数据倾斜的情况,因此基本上都是看yarn的执行情况,本来10分钟能执行完毕的,拖到半小时甚至一个小时的情况,甚至再有的数据两三个小时跑完的,半天都跑不完的情况。看Excutor往往卡在几个节点上的数据超大,比如几十个G,其他节点500多M左右这种。基本上就是数据倾斜了。

        针对以上场景,我进行数据倾斜的常用办法及解决办法。

数据倾斜导致的问题

  1. 资源利用不均衡:少数节点负载过高,而其他节点闲置或负载较低。

  2. 任务执行时间变长:负载高的节点需要处理更多数据,导致任务执行时间变长,成为整个作业的瓶颈。

  3. 内存不足或 OOM(Out of Memory):单个节点处理的数据量过大,可能导致内存不足,甚至触发 OOM 错误。

  4. 作业失败:如果倾斜严重,可能导致任务失败,进而导致整个作业失败。

如何判断数据倾斜?

  1. Spark UI 观察

    • 在 Spark UI 的 Stages 页面中,查看每个任务的执行时间。

    • 如果某些任务的执行时间明显比其他任务长,且输入数据量(Input Size)远大于其他任务,则可能存在数据倾斜。

    • 在 Executors 页面中,查看每个 Executor 的数据处理量,如果某些 Executor 处理的数据量远大于其他 Executor,则可能存在数据倾斜。

  2. 日志分析

    • 查看日志中是否有 OOM 错误或任务失败的信息。

    • 如果有大量任务在少数节点上失败或重试,则可能存在数据倾斜。

  3. 数据分布统计

    • 对数据进行统计,查看某些 key 的数据量是否远大于其他 key。

    • 例如,使用 groupByKey 或 countByKey 统计每个 key 的数据量。

数据倾斜的解决办法(针对我开头说的情况)

1. 数据重分区(Repartition)

1.如果数据倾斜是由于分区不均匀导致的,可以通过增加分区数来使数据分布更均匀。

// 增加分区数,这个你得看情况啊
val repartDataDF = dataDF.repartition(1000)

2. 随机前缀(Salting,也就是我们常说的加盐!!)

1.如果数据倾斜是由于某些 key 的数据量过大导致的,可以为这些 key 添加随机前缀,将数据分散到多个分区。

val saltedData = dataDF.map(row => {
  val key = row.getString(0)
  // 添加随机前缀
  val randomSuffix = scala.util.Random.nextInt(10) 
  (s"$key-$randomSuffix", row)
})

 3.广播变量(Broadcast Variables)

  1.如果数据倾斜是由于小表连接大表导致的,可以将小表广播到每个节点。

val broadcastSmallDataDF = spark.sparkContext.broadcast(smallDataDF.collect().toMap)

val resultDF = largeDataDF.mapPartitions(partition => {
  val smallDataMap = broadcastSmallData.value
  partition.map(row => (row, smallDataMap.get(row.getString(0))))
})

4.调整并行度

1.如果数据倾斜是由于分区数不足导致的,可以调整并行度。

//我一搬都是这么做的
//注意哈,分区数最好是2~3倍的ecutor个数乘以excuor核数
spark.conf.set("spark.default.parallelism", "1000")

完毕!当然,其他的小表广播也很常见,看生产实际用啥吧,后续再补充


http://www.kler.cn/a/589707.html

相关文章:

  • Tomcat新手入门指南:从零开始安装与基本配置
  • Python驱动CATIA自动化建模:科赫雪花算法实现与工程应用
  • 《解锁华为黑科技:MindSpore+鸿蒙深度集成奥秘》
  • 咪咕MG101_晨星MSO9380芯片_安卓5.1.1_免拆卡刷固件包
  • Bash语言的手动测试
  • Keepalived 多主模型与 LVS 高可用
  • 基于System V的共享内存函数使用指南
  • Flume详解——介绍、部署与使用
  • 2025深圳国际数字能源展全球招商启动,9月18日盛大开启
  • XGPT x DeepSeek:微步AI安全助手满血升级
  • 蓝桥杯备考-----》差分数组+二分答案 借教室
  • python局部变量和全局变量
  • nodejs 使用 puppeteer 打印PDF 有元素没有打印出来
  • 什么是广播系统语言传输指数 STIPA
  • 【云原生技术】容器技术的发展史
  • 市场监管总局升级12315平台 专项整治四大市场顽疾保障消费安全
  • Leetcode——151.反转字符串中的单词
  • 【Linux】Ext系列文件系统(上)
  • 机器人触觉的意义
  • 解决 openeuler 系统 docker 下载慢,docker 镜像加速