Spark在WholeTextFiles上创建的分区少于minPartitions [英] Spark Creates Less Partitions Then minPartitions Argument on WholeTextFiles

查看:181
本文介绍了Spark在WholeTextFiles上创建的分区少于minPartitions的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含14个文件的文件夹。我在具有10个执行程序的群集上运行spark-submit,该群集的资源管理器为yarn。

I have a folder which has 14 files in it. I run the spark-submit with 10 executors on a cluster, which has resource manager as yarn.

我按以下方式创建第一个RDD:

I create my first RDD as this:

JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);

但是, files.getNumPartitions()提供我是7岁还是8岁,随机出现。然后,我不在任何地方使用合并/分区,而是使用7-8个分区来完成DAG。

However, files.getNumPartitions()gives me 7 or 8, randomly. Then I do not use coalesce/repartition anywhere and I finish my DAG with 7-8 partitions.

我知道,我们给出的参数是分区的最小数量,那么为什么Spark会将我的RDD划分为7-8个分区?

As I know, we gave argument as the "minimum" number of partitions, so that why Spark divide my RDD to 7-8 partitions?

我还运行了具有20个分区的同一程序,所以给了我11个分区。

I also run the same program with 20 partitions and it gave me 11 partitions.

我在这里看到了一个话题,但这是关于更多分区的,这根本对我没有帮助。

I have seen a topic here, but it was about "more" partitions, which did not help me at all.

注意:在程序中,我读取了另一个包含10个文件的文件夹,Spark成功创建了10个分区。在成功完成这项工作后,我进行了上述有问题的转换。

Note: In the program, I read another folder which has 10 files, and Spark creates 10 partitions successfully. I run the above problematic transformation after this successful job is finished.

文件大小:
1)25.07 KB
2)46.61 KB
3)126.34 KB
4)158.15 KB
5)169.21 KB
6)16.03 KB
7)67.41 KB
8)60.84 KB
9 )70.83 KB
10)87.94 KB
11)99.29 KB
12)120.58 KB
13)170.43 KB
14)183.87 KB

File sizes: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB

文件位于HDFS上,块大小为128MB,复制因子3。

Files are on the HDFS, block sizes are 128MB, replication factor 3.

推荐答案


如果我们拥有每个文件的大小,这将更加清楚。但是代码不会错。我根据火花代码库添加此答案

It would have been more clear if we have size of each file. But code will not be wrong. I am adding this answer as per spark code base




  • 首先, maxSplitSize 的计算取决于 目录大小 最小分区 WholeTextFiles

        def setMinPartitions(context: JobContext, minPartitions: Int) {
          val files = listStatus(context).asScala
          val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
          val maxSplitSize = Math.ceil(totalLen * 1.0 /
            (if (minPartitions == 0) 1 else minPartitions)).toLong
          super.setMaxSplitSize(maxSplitSize)
        }
        // file: WholeTextFileInputFormat.scala
    

    链接

    根据 maxSplitSize 分割(Spark中的分区)将从源中提取。

    As per maxSplitSize splits(partitions in Spark) will be extracted from source.

        inputFormat.setMinPartitions(jobContext, minPartitions)
        val rawSplits = inputFormat.getSplits(jobContext).toArray // Here number of splits will be decides
        val result = new Array[Partition](rawSplits.size)
        for (i <- 0 until rawSplits.size) {
          result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
        }
        // file: WholeTextFileRDD.scala
    

    链接

    更多信息,请参见 CombineFileInputFormat#getSplits 类,用于读取文件和准备拆分。

    More information available at CombineFileInputFormat#getSplits class on reading files and preparing splits.


    注意:



    我在这里将 Spark分区称为MapReduce分割,因为Spark
    从MapReduce借用了输入和输出格式化程序

    Note:

    I referred Spark partitions as MapReduce splits here, as Spark borrowed input and output formatters from MapReduce

    这篇关于Spark在WholeTextFiles上创建的分区少于minPartitions的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆