Spark RDD的默认分区数 [英] Spark RDD default number of partitions

查看:92
本文介绍了Spark RDD的默认分区数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

版本:Spark 1.6.2,Scala 2.10

Version: Spark 1.6.2, Scala 2.10

我正在spark-shell中执行以下命令. 我试图查看默认情况下Spark正在创建的分区数.

I am executing below commands In the spark-shell. I am trying to see the number of partitions that Spark is creating by default.

val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4

//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2

根据Apache Spark 文档spark.default.parallelism为我的笔记本电脑(2核处理器)中的内核数.

As per the Apache Spark documentation, the spark.default.parallelism is the number of cores in my laptop (which is 2 core processor).

我的问题是:rdd2似乎给出了文档中所说的2个分区的正确结果.但是,为什么rdd1将结果作为4个分区呢?

My question is : rdd2 seem to be giving the correct result of 2 partitions as said in the documentation. But why rdd1 is giving the result as 4 partitions ?

推荐答案

最小分区数实际上是SparkContext设置的下限.由于 spark 在后台使用了 hadoop ,因此Hadoop

The minimum number of partitions is actually a lower bound set by the SparkContext. Since spark uses hadoop under the hood, Hadoop InputFormat` will still be the behaviour by default.

第一种情况应反映defaultParallelism,如

The first case should reflect defaultParallelism as mentioned here which may differ, depending on settings and hardware. (Numbers of cores, etc.)

因此,除非您提供切片的数量,否则第一种情况将由sc.defaultParallelism描述的数量定义:

So unless you provide the number of slices, that first case would be defined by the number described by sc.defaultParallelism:

scala> sc.defaultParallelism
res0: Int = 6

scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6

对于第二种情况,对于sc.textFile,默认情况下片的数量是最小分区数.

As for the second case, with sc.textFile, the number of slices by default is the minimum number of partitions.

等于2 /apache/spark/SparkContext.scala#L2329"rel =" noreferrer>这部分代码.

Which is equal to 2 as you can see in this section of code.

因此,您应该考虑以下几点:

Thus, you should consider the following :

  • sc.parallelize将采用numSlicesdefaultParallelism.

sc.textFile将采用minPartitions和根据hadoop输入拆分大小除以块大小计算的拆分数之间的最大值.

sc.textFile will take the maximum between minPartitions and the number of splits computed based on hadoop input split size divided by the block size.

  • sc.textFile调用sc.hadoopFile,这将创建一个使用InputFormat.getSplitsHadoopRDD. InputFormat文档].

  • sc.textFile calls sc.hadoopFile, which creates a HadoopRDD that uses InputFormat.getSplits under the hood [Ref. InputFormat documentation].

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException:逻辑上拆分作业的输入文件集. 然后将每个InputSplit分配给一个单独的Mapper进行处理. 注意:拆分是输入的逻辑拆分,输入文件在物理上并未拆分成块.例如拆分可能是元组.参数:job-作业配置. numSplits-所需的分割数,提示.返回:作业的InputSplits数组.抛出:IOException.

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException : Logically split the set of input files for the job. Each InputSplit is then assigned to an individual Mapper for processing. Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be tuple. Parameters: job - job configuration. numSplits - the desired number of splits, a hint. Returns: an array of InputSplits for the job. Throws: IOException.

示例:

让我们创建一些虚拟文本文件:

Let's create some dummy text files:

fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt

这将分别创建2个文件,大小分别为241MB和4GB.

This will create 2 files, respectively, of size 241MB and 4GB.

我们可以看到在读取每个文件时会发生什么:

We can see what happens when we read each of the files:

scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27

scala> rdd.getNumPartitions
// res0: Int = 8

scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27

scala> rdd2.getNumPartitions
// res1: Int = 128

它们实际上都是HadoopRDD:

scala> rdd.toDebugString
// res2: String = 
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
//  |  bigfile.txt HadoopRDD[0] at textFile at <console>:27 []

scala> rdd2.toDebugString
// res3: String = 
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
//   |   hugefile.txt HadoopRDD[2] at textFile at <console>:27 []

这篇关于Spark RDD的默认分区数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆