PySpark应用纱上失败GROUPBY [英] PySpark Yarn Application fails on groupBy

查看:309
本文介绍了PySpark应用纱上失败GROUPBY的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图运行纱线模式的作业,处理大量的数据( 2TB )从谷歌云存储读取。

I'm trying to run a job on Yarn mode that processes a large amount of data (2TB) read from google cloud storage.

这条管道可以概括如下:

The pipeline can be summarized like this :

sc.textFile("gs://path/*.json")\
.map(lambda row: json.loads(row))\
.map(toKvPair)\
.groupByKey().take(10)

 [...] later processing on collections and output to GCS.
  This computation over the elements of collections is not associative,
  each element is sorted in it's keyspace.

当在 10GB的数据运行,它的完成没有任何问题。
然而,当我在完整数据集运行它,它失败所有的时间与此日志中的容器:

When run on 10GB of data, it's completed without any issue. However when I run it on the full dataset, it fails all the time with this logs in the containers:

15/11/04 16:08:07 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: xxxxxxxxxxx
15/11/04 16:08:07 ERROR org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
15/11/04 16:08:07 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down

我试图通过连接到主发动每一个操作一个接一个调查,它似乎无法在 GROUPBY 。我也试图通过增加节点和升级自己的CPU的内存和数量,但我​​仍然有同样的问题,以重新调整集群。

I tried investigating by launching each operation one-by-one by connecting to the master and it seems to fail on the groupBy. I also tried to rescale the cluster by adding nodes and upgrading their memory and number of CPUs but I still have the same issue.

120个节点+ 1个主用相同的规格:
8个vCPU - 52GB纪念品

120 Nodes + 1 master with the same specs : 8 vCPU - 52GB Mem

我试图找到一个类似的问题线程没有成功,所以我真的不知道我应该提供什么信息,因为日志不是很清楚,所以随时要求获得更多信息。

I tried to find threads with a similar issue without success so I don't really know what information I should provide since the logs aren't very clear, so feel free to ask for more info.

主键是每个记录的要求值,我们需要不带过滤器的所有按键时,重新presents大约60万项。
难道真的有可能来执行,而不扩展群集大量的东西这个操作?我刚才读databricks做了一个排序的100TB的数据(的https:/ /databricks.com/blog/2014/10/10/spark-petabyte-sort.html )至极也涉及大规模的洗牌。他们成功由单一导致很多磁盘IO的替代多个内存缓冲区?是否有可能与我的集群规模,进行这样的操作?

The primary key is a required value for every record and we need all the keys without filter, that represents roughly 600k keys. Is it really possible to perform this operation without to scale the cluster to something massive? I just read databricks did a sort on 100TB of data (https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html) wich also involves a massive shuffle. They succeeded by replacing multiple in-memory buffers by single one resulting in a lot of Disk IO ? Is it possible with my cluster scale to perform such operation?

推荐答案

要总结一下,我们通过对原来问题的意见了解到,如果一个小数据集的作品(尤其是一个可能适合在一台机器上的总内存),然后大型数据集失败,尽管增加显著多节点集群,结合 groupByKey 的任何使用,要寻找的是你的数据是否有一些显著失衡最常见的事每个键的记录。

To summarize what we learned through the comments on the original question, if a small dataset works (especially one that potentially fits in a single machine's total memory) and then a large dataset fails despite adding significantly more nodes to the cluster, combined with any usage of groupByKey, the most common thing to look for is whether your data has significant imbalance of the number of records per key.

在特别 groupByKey 今天仍然具有这样的约束,不仅必须为所有值的单个键获得洗牌在同一台机器,但它们必须能够适合在内存中,以及:

In particular, groupByKey as of today still has a constraint in that not only must all values for a single key get shuffled to the same machine, they must all be able to fit in memory as well:

<一个href=\"https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L503\" rel=\"nofollow\">https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L503

/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * Note: This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
 * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
 *
 * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
 */

有一些这个问题的进一步讨论,其指向一个<一个href=\"http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487\"相对=nofollow>邮件列表讨论,其中包括解决方法的一些讨论;即,您可以明确地追加值/记录的关键,散列到一些小套桶的哈希串,让您手动分片出你的大集团。

There's some further discussion of this problem which points at a mailing list discussion which includes some discussion of workarounds; namely, you may be able to explicitly append a hash string of the value/record to the key, hashed into some small set of buckets, so that you manually shard out your large groups.

在你的情况,你甚至可以做一个 .MAP 变换最初只有条件调整称为热键的键将其拆分为分组,同时使非热键不变。

In your case you could even do a .map transform initially which only conditionally adjusts the keys of known hot keys to divide it up into subgroups while leaving non-hot keys unaltered.

在一般情况下,内存的约束意味着你不能真正通过增加更多的节点得到解决显著倾斜按键,因为它需要热节点上缩放就地。有关具体情况您可以设置 spark.executor.memory - 设置或dataproc gcloud测试dataproc工作提出火花[其他标志] --properties spark.executor.memory =30克,只要MAX键的值可以都适合在30克(有一定的裕量/开销以及)。但是,这顶出不惜一切最大的机器是可用的,所以如果有任何机会,当你的整体数据增长的最大键的大小将增长,这是更好地改变密钥分配本身,而不是试图杀青单执行人内存

In general, the "in-memory" constraint means you can't really get around significantly skewed keys by adding more nodes, since it requires scaling "in-place" on the hot node. For specific cases you may be able to set spark.executor.memory as a --conf or in dataproc gcloud beta dataproc jobs submit spark [other flags] --properties spark.executor.memory=30g, as long as the max key's values can all fit in that 30g (with some headroom/overhead as well). But that will top out at whatever largest machine is available, so if there's any chance that the size of the max key will grow when your overall dataset grows, it's better to change the key distribution itself rather than try to crank up single-executor memory.

这篇关于PySpark应用纱上失败GROUPBY的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆