任务量大,最简单的程序 [英] Large task size for simplest program

查看:77
本文介绍了任务量大,最简单的程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark运行最简单的程序

I am trying to run the simplest program with Spark

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dat = (1 to 10000000).toList
    val data = sc.parallelize(dat).cache()
    for(i <- 1 to 100){
      println(data.reduce(_ + _))
    }
  }   
}

每次迭代后,我都会收到以下错误消息:

I get the following error message, after each iteration :

WARN TaskSetManager:阶段0包含一个非常大的任务(9767 KB).建议的最大任务大小为100 KB.

WARN TaskSetManager: Stage 0 contains a task of very large size (9767 KB). The maximum recommended task size is 100 KB.

增加数据大小会增加所述任务的大小.这向我暗示了驱动程序正在将"dat"对象运送给所有执行者,但是我不能一辈子明白为什么,因为在我的RDD上唯一的操作是reduce,它基本上没有关闭.有任何想法吗 ?

Increasing the data size increases said task size. This suggests to me that the driver is shipping the "dat" object to all executors, but I can't for the life of me see why, as the only operation on my RDD is reduce, which basically has no closure. Any ideas ?

推荐答案

由于首先在本地创建了非常大的列表,因此Spark parallelize方法尝试将该列表作为一个单元发送给Spark工作者.任务.因此,您收到警告消息.或者,您可以并行化一个较小的列表,然后使用flatMap将其分解为较大的列表.这也具有创建并行更大的数字集的好处.例如:

Because you create the very large list locally first, the Spark parallelize method is trying to ship this list to the Spark workers as a single unit, as part of a task. Hence the warning message you receive. As an alternative, you could parallelize a much smaller list, then use flatMap to explode it into the larger list. this also has the benefit of creating the larger set of numbers in parallel. For example:

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest extends App {

  val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val dat = (0 to 99).toList
  val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
  println(data.count()) //100000000
  println(data.reduce(_ + _))
  sc.stop()
}

最终,必须将并行化的本地集合推送给执行者. parallelize方法创建ParallelCollectionRDD的实例:

Ultimately the local collection being parallelized has to be pushed to the executors. The parallelize method creates an instance of ParallelCollectionRDD:

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L730

ParallelCollectionRDD创建的分区数量等于numSlices:

ParallelCollectionRDD creates a number of partitions equal to numSlices:

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

numSlices的默认值为sc.defaultParallelism,在我的机器上为4.因此,即使拆分,每个分区也包含一个非常大的列表,需要将其推入执行器.

numSlices defaults to sc.defaultParallelism which on my machine is 4. So even when split, each partition contains a very large list which needs to be pushed to an executor.

SparkContext.parallelize包含注释@note Parallelize acts lazily,而ParallelCollectionRDD包含注释;

SparkContext.parallelize contains the note @note Parallelize acts lazily and ParallelCollectionRDD contains the comment;

//TODO:现在,每个拆分都会发送其完整数据,即使 稍后在RDD链中//被缓存.可能值得 将数据写入DFS中的文件并在split中读取// 代替.

// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // instead.

因此,当您调用reduce时似乎会发生问题,因为这是将分区发送给执行者的原因,但是根本原因是您在很大的列表上调用了并行化.恕我直言,在执行者中生成大量清单是一种更好的方法.

So it appears that the problem happens when you call reduce because this is the point that the partitions are sent to the executors, but the root cause is that you are calling parallelize on a very big list. Generating the large list within the executors is a better approach, IMHO.

这篇关于任务量大,最简单的程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆