最简单程序的大任务量 [英] Large task size for simplest program

查看:23
本文介绍了最简单程序的大任务量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试用 Spark 运行最简单的程序

I am trying to run the simplest program with Spark

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dat = (1 to 10000000).toList
    val data = sc.parallelize(dat).cache()
    for(i <- 1 to 100){
      println(data.reduce(_ + _))
    }
  }   
}

每次迭代后,我都会收到以下错误消息:

I get the following error message, after each iteration :

WARN TaskSetManager: Stage 0 包含一个非常大的任务 (9767KB).建议的最大任务大小为 100 KB.

WARN TaskSetManager: Stage 0 contains a task of very large size (9767 KB). The maximum recommended task size is 100 KB.

增加数据大小会增加所述任务的大小.这向我表明驱动程序正在将dat"对象传送给所有执行程序,但我一生都无法理解为什么,因为我的 RDD 上唯一的操作是 reduce,它基本上没有关闭.有任何想法吗 ?

Increasing the data size increases said task size. This suggests to me that the driver is shipping the "dat" object to all executors, but I can't for the life of me see why, as the only operation on my RDD is reduce, which basically has no closure. Any ideas ?

推荐答案

因为您首先在本地创建非常大的列表,Spark parallelize 方法会尝试将此列表作为单个单元,作为任务的一部分.因此,您会收到警告消息.作为替代方案,您可以并行化一个更小的列表,然后使用 flatMap 将其分解为更大的列表.这也有利于并行创建更大的数字集.例如:

Because you create the very large list locally first, the Spark parallelize method is trying to ship this list to the Spark workers as a single unit, as part of a task. Hence the warning message you receive. As an alternative, you could parallelize a much smaller list, then use flatMap to explode it into the larger list. this also has the benefit of creating the larger set of numbers in parallel. For example:

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest extends App {

  val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val dat = (0 to 99).toList
  val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
  println(data.count()) //100000000
  println(data.reduce(_ + _))
  sc.stop()
}

最终,被并行化的本地集合必须被推送到执行器.parallelize 方法创建一个 ParallelCollectionRDD 实例:

Ultimately the local collection being parallelized has to be pushed to the executors. The parallelize method creates an instance of ParallelCollectionRDD:

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L730

ParallelCollectionRDD 创建的分区数量等于 numSlices:

ParallelCollectionRDD creates a number of partitions equal to numSlices:

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L96

numSlices 默认为 sc.defaultParallelism,在我的机器上是 4.所以即使拆分时,每个分区也包含一个非常大的列表,需要推送到执行程序.

numSlices defaults to sc.defaultParallelism which on my machine is 4. So even when split, each partition contains a very large list which needs to be pushed to an executor.

SparkContext.parallelize 包含注释 @note Parallelize 延迟行为ParallelCollectionRDD 包含注释;

SparkContext.parallelize contains the note @note Parallelize acts lazily and ParallelCollectionRDD contains the comment;

//TODO:现在,每个拆分都会发送其完整数据,即使稍后它会在 RDD 链中//被缓存.这可能是值得的将数据写入 DFS 中的文件并在拆分中读取它//相反.

// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // instead.

所以看起来问题发生在你调用reduce时,因为这是将分区发送到执行器的点,但根本原因是你在一个非常大的列表上调用parallelize.恕我直言,在 executors 中生成大列表是一种更好的方法.

So it appears that the problem happens when you call reduce because this is the point that the partitions are sent to the executors, but the root cause is that you are calling parallelize on a very big list. Generating the large list within the executors is a better approach, IMHO.

这篇关于最简单程序的大任务量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆