Spark:写入数据帧时“请求的数组大小超过 VM 限制" [英] Spark: 'Requested array size exceeds VM limit' when writing dataframe

查看:30
本文介绍了Spark:写入数据帧时“请求的数组大小超过 VM 限制"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了OutOfMemoryError:请求的数组大小超过 VM 限制";运行我的 Scala Spark 作业时出错.

I am running into a "OutOfMemoryError: Requested array size exceeds VM limit" error when running my Scala Spark job.

我在具有以下组成的 AWS EMR 集群上运行此作业:

I'm running this job on an AWS EMR cluster with the following makeup:

主:1 m4.4xlarge 32 vCore,64 GiB 内存

Master: 1 m4.4xlarge 32 vCore, 64 GiB memory

核心:1 r3.4xlarge 32 vCore,122 GiB 内存

Core: 1 r3.4xlarge 32 vCore, 122 GiB memory

我使用的 Spark 版本是 EMR 发布标签 5.11.0 上的 2.2.1.

The version of Spark I'm using is 2.2.1 on EMR release label 5.11.0.

我在具有以下配置的 spark shell 中运行我的工作:

I'm running my job in a spark shell with the following configurations:

spark-shell --conf spark.driver.memory=40G 
--conf spark.driver.maxResultSize=25G 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.kryoserializer.buffer.max=2000 
--conf spark.rpc.message.maxSize=2000 
--conf spark.dynamicAllocation.enabled=true

我试图用这项工作做的是将对象的一列数据帧转换为包含这些对象列表的一行数据帧.

What I'm attempting to do with this job is to convert a one column dataframe of objects into a one row dataframe that contains a list of those objects.

对象如下:

case class Properties (id: String)
case class Geometry (`type`: String, coordinates: Seq[Seq[Seq[String]]])
case class Features (`type`: String, properties: Properties, geometry: Geometry)

我的数据框架构如下:

root
 |-- geometry: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |-- type: string (nullable = false)
 |-- properties: struct (nullable = false)
 |    |-- id: string (nullable = true)

我正在将其转换为列表并将其添加到一行数据框中,如下所示:

I'm converting it to a list and adding it to a one row dataframe like so:

val x = Seq(df.collect.toList)
final_df.withColumn("features", typedLit(x))

我在创建此列表时没有遇到任何问题,而且速度非常快.但是,当我尝试通过执行以下任一操作将其写出时,此列表的大小似乎有限制:

I don't run into any issues when creating this list and it's pretty quick. However, there seems to be a limit to the size of this list when I try to write it out by doing either of the following:

final_df.first
final_df.write.json(s"s3a://<PATH>/")

我也尝试通过执行以下操作将列表转换为数据框,但它似乎永远不会结束.

I've tried to also convert the list to a dataframe by doing the following, but it seems to never end.

val x = Seq(df.collect.toList)
val y = x.toDF

我能够使用这个数据框的最大列表有 813318 个 Features 对象,每个对象包含一个 Geometry 对象,其中包含 33 个元素的列表,总共 29491869 个元素.

The largest list I've been capable of getting this dataframe to work with had 813318 Features objects, each of which contains a Geometry object that contains a list of 33 elements, for a total of 29491869 elements.

在运行我的作业时,尝试编写几乎任何大于该列表的列表都会为我提供以下堆栈跟踪.

Attempting to write pretty much any list larger than that gives me the following stacktrace when running my job.

# java.lang.OutOfMemoryError: Requested array size exceeds VM limit
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 33028"...
os::fork_and_exec failed: Cannot allocate memory (12)
18/03/29 21:41:35 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.write(UnsafeArrayWriter.java:217)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply1_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:41)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:41)
    at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:36)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
    at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:52)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)

我已经尝试进行一百万次配置更改,包括在这项工作中投入更多的驱动程序和执行程序内存,但无济于事.有没有办法解决?有什么想法吗?

I've tried making a million configuration changes, including throwing both more driver and executor memory at this job, but to no avail. Is there any way around this? Any ideas?

推荐答案

嗯,有一个数据帧聚合函数可以做你想做的事,而无需对驱动程序进行收集.例如,如果您想按键收集所有功能"列:df.groupBy($"key").agg(collect_list("feature")),或者如果您真的想这样做对于没有分组的整个数据帧:df.agg(collect_list("feature")).

Well, there is a dataframe aggregation function that does what you want without doing a collect on the driver. For example if you wanted to collect all "feature" columns by key: df.groupBy($"key").agg(collect_list("feature")), or if you really wanted to do that for the whole dataframe without grouping: df.agg(collect_list("feature")).

但是,我想知道您为什么要这样做,因为使用每个对象一行的数据框似乎比包含整个结果的单行更容易.即使使用 collect_list 聚合函数,如果您仍然内存不足,我也不会感到惊讶.

However I wonder why you'd want to do that, when it seems easier to work with a dataframe with one row per object than a single row containing the entire result. Even using the collect_list aggregation function I wouldn't be surprised if you still run out of memory.

这篇关于Spark:写入数据帧时“请求的数组大小超过 VM 限制"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆