当前位置: 首页 > article >正文

Spark 中分区相关设置

import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext, TaskContext}

object partitionsobject {
  def main(args: Array[String]): Unit = {
    //配置运行环境
    val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").
      setMaster("local[*]").     
      set("spark.default.parallelism","10");   
    val sc: SparkContext = new spark.SparkContext(conf);
    //创建RDD
    val linesRdd =sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),3); 
    linesRdd.repartition(1);
    linesRdd.foreach(num => {
      println(num + " is in partition: " + TaskContext.getPartitionId());
    });
  }
}

分析一下这段代码中

setMaster("local[*]")、

set("spark.default.parallelism","10")、

sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),3)、

linesRdd.repartition(1)这几个设置的优先级,以及数据最后的分区数?

在这段代码中,涉及到了多个与分区数相关的设置,包括 setMasterspark.default.parallelismparallelize 的分区参数以及 repartition 方法。我们需要分析这些设置的优先级,以及最终 RDD 的分区数。


1. 设置分析

(1) setMaster("local[*]")
  • 这个设置指定了 Spark 应用程序运行在本地模式,[*] 表示使用本地机器的所有可用 CPU 核心。

  • 这个设置本身不会直接影响分区数,但它会影响集群的资源分配和并行度。

(2) set("spark.default.parallelism", "10")
  • 这个设置指定了 Spark 的默认并行度为 10

  • spark.default.parallelism 是 Spark 的一个全局配置参数,用于控制 RDD 的默认分区数。

  • 如果没有显式指定分区数(例如在 parallelize 或 repartition 中),Spark 会使用这个值作为默认的分区数。

(3) sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15), 3)
  • 这里显式地指定了分区数为 3

  • 在 parallelize 方法中,如果显式指定了分区数,Spark 会优先使用这个值,而忽略 spark.default.parallelism

(4) linesRdd.repartition(1)
  • repartition 方法用于调整 RDD 的分区数。这里指定了分区数为 1

  • repartition 是一个 transformation 操作,它会返回一个新的 RDD,而不会修改原始的 RDD。

  • 在代码中,repartition 的结果没有被赋值给任何变量,因此原始的 linesRdd 仍然保持原来的分区数(即 3 个分区)。


2. 优先级分析

在 Spark 中,分区数的优先级如下:

  1. 显式指定的分区数(例如 parallelize 或 repartition 中的参数)优先级最高。

  2. 如果没有显式指定分区数,Spark 会使用 spark.default.parallelism 的值。

  3. 如果没有设置 spark.default.parallelism,Spark 会根据集群的资源和输入数据的大小自动决定分区数。

在代码中:

  • parallelize 显式指定了分区数为 3,因此优先级最高,RDD 的初始分区数为 3

  • repartition(1) 虽然指定了分区数为 1,但由于其结果没有被赋值给任何变量,原始的 linesRdd 仍然保持分区数为 3


3. 最终分区数

  • 由于 repartition 的结果没有被使用,原始的 linesRdd 仍然保持分区数为 3

  • 因此,最终的分区数是 3


4. 代码执行结果

  • foreach 操作会在原始的 linesRdd 上执行,分区数为 3

  • 每个元素会被分配到 3 个分区中,分区 ID 为 01 和 2

  • 输出示例:

    1 is in partition: 0
    2 is in partition: 0
    3 is in partition: 0
    4 is in partition: 1
    5 is in partition: 1
    6 is in partition: 1
    7 is in partition: 2
    8 is in partition: 2
    9 is in partition: 2
    10 is in partition: 0
    11 is in partition: 0
    12 is in partition: 1
    13 is in partition: 1
    14 is in partition: 2
    15 is in partition: 2


5. 如何正确使用 repartition

如果你希望将 RDD 的分区数调整为 1,并打印出分区信息,需要将 repartition 的结果赋值给一个新的变量,例如:

val repartitionedRdd = linesRdd.repartition(1);
repartitionedRdd.foreach(num => {
  println(num + " is in partition: " + TaskContext.getPartitionId());
});
  • 这样,repartitionedRdd 的分区数会是 1,所有元素都会在分区 0 中。


总结

  • 优先级:显式指定的分区数 > spark.default.parallelism > 自动分区。

  • 最终分区数:代码中最终的分区数是 3,因为 repartition 的结果没有被使用。

  • 修正方法:如果需要将分区数调整为 1,需要将 repartition 的结果赋值给一个新的变量,并在新的 RDD 上执行操作。


http://www.kler.cn/a/571315.html

相关文章:

  • 拉格朗日对偶性(Lagrangian Duality)详解
  • 国产编辑器EverEdit - 优化性能的一些设置项
  • 74道高级Java面试合集,java开发模式面试题
  • 【http://noi.openjudge.cn/】4.3算法之图论——1538:Gopher II
  • 14天 -- Redis 的持久化机制有哪些?Redis 主从复制的实现原理是什么? Redis 数据过期后的删除策略是什么?
  • DeepSeek开源周-汇总
  • VB6网络通信软件开发,上位机开发,TCP网络通信,读写数据并处理,完整源码下载
  • Leetcode 3472. Longest Palindromic Subsequence After at Most K Operations
  • 【零基础到精通Java合集】第十六集:多线程与并发编程
  • vue2(笔记)4.0vueRouter.声明式/编程式导航以及跳转传参.重定向
  • 浅谈汽车系统电压优缺点分析
  • PyTorch 中结合迁移学习和强化学习的完整实现方案
  • 【2025rust笔记】超详细,小白,rust基本语法
  • vue 提升html2canvas渲染速度
  • 第十天-字符串:编程世界的文本基石
  • 深入 Vue.js 组件开发:从基础到实践
  • 深入探索像ChatGPT这样的大语言模型
  • 记一次渗透测试实战:SQL注入漏洞的挖掘与利用
  • Trae:国内首款AI原生IDE,编程效率大提升
  • AI大模型-提示工程学习笔记21-图提示 (Graph Prompting)