pyspark/EMR中的大型DataFrame上的collect()或toPandas() [英] collect() or toPandas() on a large DataFrame in pyspark/EMR

查看:319
本文介绍了pyspark/EMR中的大型DataFrame上的collect()或toPandas()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我拥有一台计算机"c3.8xlarge"的EMR群集,在读取了几种资源后,我了解到由于我使用pyspark,必须允许大量的内存堆外使用,因此我将群集配置如下:

I have an EMR cluster of one machine "c3.8xlarge", after reading several resources, I understood that I have to allow decent amount of memory off-heap because I am using pyspark, so I have configured the cluster as follow:

一名执行人:

  • spark.executor.memory 6克
  • spark.executor.cores 10
  • spark.yarn.executor.memoryOverhead 4096

驱动程序:

  • spark.driver.memory 21克

当我cache() DataFrame时,它需要大约3.6GB的内存.

When I cache() the DataFrame it takes about 3.6GB of memory.

现在,当我在DataFrame上调用collect()toPandas()时,进程崩溃.

Now when I call collect() or toPandas() on the DataFrame, the process crashes.

我知道我正在将大量数据带入驱动程序,但是我认为它不是那么大,而且我无法弄清崩溃的原因.

I know that I am bringing a large amount of data into the driver, but I think that it is not that large, and I am not able to figure out the reason of the crash.

当我呼叫collect()toPandas()时,出现此错误:

When I call collect() or toPandas() I get this error:

Py4JJavaError: An error occurred while calling o181.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 6.0 failed 4 times, most recent failure: Lost task 5.3 in stage 6.0 (TID 110, ip-10-0-47-207.prod.eu-west-1.hs.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1511879540686_0005_01_000016 on host: ip-10-0-47-207.prod.eu-west-1.hs.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

====更新====

按照@ user6910411的建议,我已经尝试在此处中提及的解决方案,在这种情况下,我出现以下错误:

As @user6910411 suggested, I have tried the solution mentioned here, and in that case I get the following error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 41, ip-10-0-33-57.prod.eu-west-1.hs.internal, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 13.5 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

关于这里发生什么的任何提示吗?

Any hint about what is happening here?

推荐答案

TL; DR 我相信您严重低估了内存需求.

TL;DR I believe you're seriously underestimating memory requirements.

即使假设数据已完全缓存,存储信息也只会显示将数据带回驱动程序所需的峰值内存的一小部分.

Even assuming that data is fully cached, storage info will show only a fraction of peak memory required for bringing data back to the driver.

  • First of all Spark SQL uses compressed columnar storage for caching. Depending on the data distribution and compression algorithm in-memory size can be much smaller than the uncompressed Pandas output, not to mention plain List[Row]. The latter also stores column names, further increasing memory usage.
  • Data collection is indirect, with data being stored both on the JVM side and Python side. While JVM memory can be released once data goes through socket, peak memory usage should account for both.
  • Plain toPandas implementation collects Rows first, then creates Pandas DataFrame locally. This further increases (possibly doubles) memory usage. Luckily this part is already addressed on master (Spark 2.3), with more direct approach using Arrow serialization (SPARK-13534 - Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas).

对于独立于Apache Arrow的可能解决方案,您可以检查

For possible solution independent of Apache Arrow you can check Faster and Lower memory implementation toPandas on the Apache Spark Developer List.

由于数据实际上非常大,我会考虑将其写入Parquet并使用PyArrow在Python中直接读回(读写Apache Parquet格式)完全跳过了所有中间阶段.

Since data is actually pretty large I would consider writing it to Parquet and reading it back directly in Python using PyArrow (Reading and Writing the Apache Parquet Format) completely skipping all the intermediate stages.

这篇关于pyspark/EMR中的大型DataFrame上的collect()或toPandas()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆