Spark 中分区相关设置
import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object partitionsobject {
def main(args: Array[String]): Unit = {
//配置运行环境
val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").
setMaster("local[*]").
set("spark.default.parallelism","10");
val sc: SparkContext = new spark.SparkContext(conf);
//创建RDD
val linesRdd =sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),3);
linesRdd.repartition(1);
linesRdd.foreach(num => {
println(num + " is in partition: " + TaskContext.getPartitionId());
});
}
}
分析一下这段代码中
setMaster("local[*]")、
set("spark.default.parallelism","10")、
sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),3)、
linesRdd.repartition(1)这几个设置的优先级,以及数据最后的分区数?
在这段代码中,涉及到了多个与分区数相关的设置,包括 setMaster
、spark.default.parallelism
、parallelize
的分区参数以及 repartition
方法。我们需要分析这些设置的优先级,以及最终 RDD 的分区数。
1. 设置分析
(1) setMaster("local[*]")
-
这个设置指定了 Spark 应用程序运行在本地模式,
[*]
表示使用本地机器的所有可用 CPU 核心。 -
这个设置本身不会直接影响分区数,但它会影响集群的资源分配和并行度。
(2) set("spark.default.parallelism", "10")
-
这个设置指定了 Spark 的默认并行度为
10
。 -
spark.default.parallelism
是 Spark 的一个全局配置参数,用于控制 RDD 的默认分区数。 -
如果没有显式指定分区数(例如在
parallelize
或repartition
中),Spark 会使用这个值作为默认的分区数。
(3) sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15), 3)
-
这里显式地指定了分区数为
3
。 -
在
parallelize
方法中,如果显式指定了分区数,Spark 会优先使用这个值,而忽略spark.default.parallelism
。
(4) linesRdd.repartition(1)
-
repartition
方法用于调整 RDD 的分区数。这里指定了分区数为1
。 -
repartition
是一个 transformation 操作,它会返回一个新的 RDD,而不会修改原始的 RDD。 -
在代码中,
repartition
的结果没有被赋值给任何变量,因此原始的linesRdd
仍然保持原来的分区数(即 3 个分区)。
2. 优先级分析
在 Spark 中,分区数的优先级如下:
-
显式指定的分区数(例如
parallelize
或repartition
中的参数)优先级最高。 -
如果没有显式指定分区数,Spark 会使用
spark.default.parallelism
的值。 -
如果没有设置
spark.default.parallelism
,Spark 会根据集群的资源和输入数据的大小自动决定分区数。
在代码中:
-
parallelize
显式指定了分区数为3
,因此优先级最高,RDD 的初始分区数为3
。 -
repartition(1)
虽然指定了分区数为1
,但由于其结果没有被赋值给任何变量,原始的linesRdd
仍然保持分区数为3
。
3. 最终分区数
-
由于
repartition
的结果没有被使用,原始的linesRdd
仍然保持分区数为3
。 -
因此,最终的分区数是
3
。
4. 代码执行结果
-
foreach
操作会在原始的linesRdd
上执行,分区数为3
。 -
每个元素会被分配到 3 个分区中,分区 ID 为
0
、1
和2
。 -
输出示例:
1 is in partition: 0 2 is in partition: 0 3 is in partition: 0 4 is in partition: 1 5 is in partition: 1 6 is in partition: 1 7 is in partition: 2 8 is in partition: 2 9 is in partition: 2 10 is in partition: 0 11 is in partition: 0 12 is in partition: 1 13 is in partition: 1 14 is in partition: 2 15 is in partition: 2
5. 如何正确使用 repartition
?
如果你希望将 RDD 的分区数调整为 1
,并打印出分区信息,需要将 repartition
的结果赋值给一个新的变量,例如:
val repartitionedRdd = linesRdd.repartition(1);
repartitionedRdd.foreach(num => {
println(num + " is in partition: " + TaskContext.getPartitionId());
});
-
这样,
repartitionedRdd
的分区数会是1
,所有元素都会在分区0
中。
总结
-
优先级:显式指定的分区数 >
spark.default.parallelism
> 自动分区。 -
最终分区数:代码中最终的分区数是
3
,因为repartition
的结果没有被使用。 -
修正方法:如果需要将分区数调整为
1
,需要将repartition
的结果赋值给一个新的变量,并在新的 RDD 上执行操作。