Spark:写入数据帧时,“请求的数组大小超出了VM限制" [英] Spark: 'Requested array size exceeds VM limit' when writing dataframe

查看:314
本文介绍了Spark:写入数据帧时,“请求的数组大小超出了VM限制"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到"OutOfMemoryError:请求的数组大小超出VM限制"运行Scala Spark作业时发生错误.

I am running into a "OutOfMemoryError: Requested array size exceeds VM limit" error when running my Scala Spark job.

我正在具有以下构成的AWS EMR集群上运行此作业:

I'm running this job on an AWS EMR cluster with the following makeup:

Master:1个m4.4xlarge的32 vCore,64 GiB内存

Master: 1 m4.4xlarge 32 vCore, 64 GiB memory

核心:1个r3.4xlarge 32 vCore,122 GiB内存

Core: 1 r3.4xlarge 32 vCore, 122 GiB memory

我正在使用的Spark版本是EMR发布标签5.11.0上的2.2.1.

The version of Spark I'm using is 2.2.1 on EMR release label 5.11.0.

我正在使用以下配置的Spark Shell运行我的工作:

I'm running my job in a spark shell with the following configurations:

spark-shell --conf spark.driver.memory=40G 
--conf spark.driver.maxResultSize=25G 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.kryoserializer.buffer.max=2000 
--conf spark.rpc.message.maxSize=2000 
--conf spark.dynamicAllocation.enabled=true

我要完成的工作是将对象的一列数据框转换为包含这些对象列表的一行数据框.

What I'm attempting to do with this job is to convert a one column dataframe of objects into a one row dataframe that contains a list of those objects.

对象如下:

case class Properties (id: String)
case class Geometry (`type`: String, coordinates: Seq[Seq[Seq[String]]])
case class Features (`type`: String, properties: Properties, geometry: Geometry)

我的数据框架构如下:

root
 |-- geometry: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |-- type: string (nullable = false)
 |-- properties: struct (nullable = false)
 |    |-- id: string (nullable = true)

我将其转换为列表,并将其添加到一行数据帧中,如下所示:

I'm converting it to a list and adding it to a one row dataframe like so:

val x = Seq(df.collect.toList)
final_df.withColumn("features", typedLit(x))

创建此列表时,我没有遇到任何问题,而且很快.但是,当我尝试通过以下任一方法将其写出时,此列表的大小似乎受到限制:

I don't run into any issues when creating this list and it's pretty quick. However, there seems to be a limit to the size of this list when I try to write it out by doing either of the following:

final_df.first
final_df.write.json(s"s3a://<PATH>/")

我也尝试通过以下操作将列表转换为数据框,但它似乎永无止境.

I've tried to also convert the list to a dataframe by doing the following, but it seems to never end.

val x = Seq(df.collect.toList)
val y = x.toDF

我能够使该数据框使用的最大列表有813318个Features对象,每个对象都包含一个Geometry对象,该对象包含33个元素的列表,总共29491869个元素.

The largest list I've been capable of getting this dataframe to work with had 813318 Features objects, each of which contains a Geometry object that contains a list of 33 elements, for a total of 29491869 elements.

尝试写任何比该列表大的列表,在运行我的工作时都会得到以下堆栈跟踪.

Attempting to write pretty much any list larger than that gives me the following stacktrace when running my job.

# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 33028"...
os::fork_and_exec failed: Cannot allocate memory (12)
18/03/29 21:41:35 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.write(UnsafeArrayWriter.java:217)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)

我曾尝试进行一百万个配置更改,包括在此工作中同时抛出更多的驱动程序和执行程序内存,但无济于事.有没有办法解决?有什么想法吗?

I've tried making a million configuration changes, including throwing both more driver and executor memory at this job, but to no avail. Is there any way around this? Any ideas?

推荐答案

嗯,有一个数据帧聚合函数可以执行您想要的操作,而无需在驱动程序上进行收集.例如,如果您想通过键:df.groupBy($"key").agg(collect_list("feature"))来收集所有功能"列,或者如果您真的想对整个数据框进行此操作而不进行分组:df.agg(collect_list("feature")).

Well, there is a dataframe aggregation function that does what you want without doing a collect on the driver. For example if you wanted to collect all "feature" columns by key: df.groupBy($"key").agg(collect_list("feature")), or if you really wanted to do that for the whole dataframe without grouping: df.agg(collect_list("feature")).

但是我想知道为什么要使用一个每个对象只有一行的数据框而不是包含整个结果的一行的数据框,为什么要这么做.即使使用collect_list聚合函数,如果内存仍然用完,我也不会感到惊讶.

However I wonder why you'd want to do that, when it seems easier to work with a dataframe with one row per object than a single row containing the entire result. Even using the collect_list aggregation function I wouldn't be surprised if you still run out of memory.

这篇关于Spark:写入数据帧时,“请求的数组大小超出了VM限制"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆