Spark RDD的默认分区数 [英] Spark RDD default number of partitions
问题描述
版本:Spark 1.6.2,Scala 2.10
Version: Spark 1.6.2, Scala 2.10
我正在spark-shell
中执行以下命令.
我试图查看默认情况下Spark正在创建的分区数.
I am executing below commands In the spark-shell
.
I am trying to see the number of partitions that Spark is creating by default.
val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4
//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2
根据Apache Spark 文档,spark.default.parallelism
为我的笔记本电脑(2核处理器)中的内核数.
As per the Apache Spark documentation, the spark.default.parallelism
is the number of cores in my laptop (which is 2 core processor).
我的问题是:rdd2
似乎给出了文档中所说的2个分区的正确结果.但是,为什么rdd1
将结果作为4个分区呢?
My question is : rdd2
seem to be giving the correct result of 2 partitions as said in the documentation. But why rdd1
is giving the result as 4 partitions ?
推荐答案
最小分区数实际上是SparkContext
设置的下限.由于 spark 在后台使用了 hadoop ,因此Hadoop
The minimum number of partitions is actually a lower bound set by the SparkContext
. Since spark uses hadoop under the hood, Hadoop InputFormat` will still be the behaviour by default.
The first case should reflect defaultParallelism
as mentioned here which may differ, depending on settings and hardware. (Numbers of cores, etc.)
因此,除非您提供切片的数量,否则第一种情况将由sc.defaultParallelism
描述的数量定义:
So unless you provide the number of slices, that first case would be defined by the number described by sc.defaultParallelism
:
scala> sc.defaultParallelism
res0: Int = 6
scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6
对于第二种情况,对于sc.textFile
,默认情况下片的数量是最小分区数.
As for the second case, with sc.textFile
, the number of slices by default is the minimum number of partitions.
在
Which is equal to 2 as you can see in this section of code.
因此,您应该考虑以下几点:
Thus, you should consider the following :
-
sc.parallelize
将采用numSlices
或defaultParallelism
.
sc.textFile
将采用minPartitions
和根据hadoop输入拆分大小除以块大小计算的拆分数之间的最大值.
sc.textFile
will take the maximum between minPartitions
and the number of splits computed based on hadoop input split size divided by the block size.
-
sc.textFile
调用sc.hadoopFile
,这将创建一个使用InputFormat.getSplits
的HadoopRDD
. InputFormat文档].
sc.textFile
callssc.hadoopFile
, which creates aHadoopRDD
that usesInputFormat.getSplits
under the hood [Ref. InputFormat documentation].
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
:逻辑上拆分作业的输入文件集. 然后将每个InputSplit分配给一个单独的Mapper进行处理. 注意:拆分是输入的逻辑拆分,输入文件在物理上并未拆分成块.例如拆分可能是元组.参数:job-作业配置. numSplits-所需的分割数,提示.返回:作业的InputSplits数组.抛出:IOException.
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
: Logically split the set of input files for the job. Each InputSplit is then assigned to an individual Mapper for processing. Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be tuple. Parameters: job - job configuration. numSplits - the desired number of splits, a hint. Returns: an array of InputSplits for the job. Throws: IOException.
示例:
让我们创建一些虚拟文本文件:
Let's create some dummy text files:
fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt
这将分别创建2个文件,大小分别为241MB和4GB.
This will create 2 files, respectively, of size 241MB and 4GB.
我们可以看到在读取每个文件时会发生什么:
We can see what happens when we read each of the files:
scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> rdd.getNumPartitions
// res0: Int = 8
scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> rdd2.getNumPartitions
// res1: Int = 128
它们实际上都是HadoopRDD
:
scala> rdd.toDebugString
// res2: String =
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
// | bigfile.txt HadoopRDD[0] at textFile at <console>:27 []
scala> rdd2.toDebugString
// res3: String =
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
// | hugefile.txt HadoopRDD[2] at textFile at <console>:27 []
这篇关于Spark RDD的默认分区数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!