SparkR收集方法因Java堆空间上的OutOfMemory而崩溃 [英] SparkR collect method crashes with OutOfMemory on Java heap space

查看:111
本文介绍了SparkR收集方法因Java堆空间上的OutOfMemory而崩溃的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用SparkR,我正在尝试PoC收集我从包含大约4M行的文本文件中创建的RDD.

With SparkR, I'm trying for a PoC to collect an RDD that I created from text files which contains around 4M lines.

我的Spark集群在Google Cloud中运行,已部署bdutil,它由1个主服务器和2个工作器组成,具有15gb的RAM和4个内核.我的HDFS存储库基于带有gcs-connector 1.4.0的Google存储. 每台机器上都安装了SparkR,基本测试正在处理小型文件.

My Spark cluster is running in Google Cloud, is bdutil deployed and is composed with 1 master and 2 workers with 15gb of RAM and 4 cores each. My HDFS repository is based on Google Storage with gcs-connector 1.4.0. SparkR is intalled on each machine, and basic tests are working on small files.

这是我使用的脚本:

Sys.setenv("SPARK_MEM" = "1g")
sc <- sparkR.init("spark://xxxx:7077", sparkEnvir=list(spark.executor.memory="1g"))
lines <- textFile(sc, "gs://xxxx/dir/")
test <- collect(lines)

我第一次运行此程序似乎运行良好,所有任务均成功运行,spark的ui表示该工作已完成,但我再也没有收到R提示:

First time I run this, it seems to be working fine, all the tasks are run successfully, spark's ui says that the job completed, but I never get the R prompt back :

15/06/04 13:36:59 WARN SparkConf: Setting 'spark.executor.extraClassPath' to ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' as a work-around.
15/06/04 13:36:59 WARN SparkConf: Setting 'spark.driver.extraClassPath' to ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' as a work-around.
15/06/04 13:36:59 INFO Slf4jLogger: Slf4jLogger started
15/06/04 13:37:00 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/04 13:37:00 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52439
15/06/04 13:37:00 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/04 13:37:00 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040

15/06/04 13:37:54 INFO GoogleHadoopFileSystemBase: GHFS version: 1.4.0-hadoop1
15/06/04 13:37:55 WARN LoadSnappy: Snappy native library is available
15/06/04 13:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/04 13:37:55 WARN LoadSnappy: Snappy native library not loaded
15/06/04 13:37:55 INFO FileInputFormat: Total input paths to process : 68
[Stage 0:=======================================================>                                                                                     (27 + 10) / 68]

然后在CTRL-C返回R提示符后,我尝试再次运行collect方法,这是结果:

Then after a CTRL-C to get the R prompt back, I try to run the collect method again, here is the result :

[Stage 1:==========================================================>                                                                                   (28 + 9) / 68]15/06/04 13:42:08 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at org.spark_project.protobuf.ByteString.toByteArray(ByteString.java:515)
        at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:64)
        at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.serialization.Serialization.deserialize(Serialization.scala:98)
        at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
        at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
        at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我了解异常消息,但是我不明白为什么我第二次收到此消息. 另外,为什么收集在Spark中完成后再也不会返回?

I understand the exception message, but I don't understand why I am getting this the second time. Also, why the collect never returns after completing in Spark?

我搜索了我拥有的每条信息,但没有找到解决方案的运气.任何帮助或提示将不胜感激!

I Googled every piece of information I have, but I had no luck finding a solution. Any help or hint would be greatly appreciated!

谢谢

推荐答案

这似乎是Java内存中对象表示形式的简单组合,效率很低,但与一些明显的长寿命对象引用结合在一起,这导致某些集合无法及时进行垃圾收集,以便新的collect()调用就地覆盖旧的.

This does appear to be a simple combination of Java in-memory object representations being inefficient combined with some apparent long-lived object references which cause some collections to fail to be garbage-collected in time for the new collect() call to overwrite the old one in-place.

我尝试了一些选项,对于包含约400万行的256MB示例文件,我确实重现了您的行为,其中第一次使用collect可以,但是第二次使用SPARK_MEM=1g可以进行OOM.然后我改为设置SPARK_MEM=4g,然后我可以按ctrl + c并重新运行test <- collect(lines)任意多次.

I experimented with some options, and for my sample 256MB file that contains ~4M lines, I indeed reproduce your behavior where collect is fine the first time, but OOMs the second time, when using SPARK_MEM=1g. I then set SPARK_MEM=4g instead, and then I'm able to ctrl+c and re-run test <- collect(lines) as many times as I want.

有一件事,即使引用没有泄漏,请注意,第一次运行test <- collect(lines)之后,变量test保持着巨大的行数组,而第二次调用时,collect(lines) before 之前执行,最后被分配给test变量,因此,在任何简单的指令排序中,都无法对test的旧内容进行垃圾收集.这意味着第二次运行将使SparkRBackend进程同时保存整个集合的两个副本,从而导致您看到的OOM.

For one thing, even if references didn't leak, note that after the first time you ran test <- collect(lines), the variable test is holding that gigantic array of lines, and the second time you call it, the collect(lines) executes before finally being assigned to the test variable and thus in any straightforward instruction-ordering, there's no way to garbage-collect the old contents of test. This means the second run will make the SparkRBackend process hold two copies of the entire collection at the same time, leading to the OOM you saw.

为了进行诊断,我在主服务器上启动了SparkR,然后首次运行

To diagnose, on the master I started SparkR and first ran

dhuo@dhuo-sparkr-m:~$ jps | grep SparkRBackend
8709 SparkRBackend

我还检查了top,它正在使用大约22MB的内存.我用jmap获取了堆配置文件:

I also checked top and it was using around 22MB of memory. I fetched a heap profile with jmap:

jmap -heap:format=b 8709
mv heap.bin heap0.bin

然后我运行了test <- collect(lines)的第一轮,此时运行top时使用了约1.7g RES内存显示了该信息.我抓起另一个堆转储.最后,我还尝试了test <- {}摆脱了允许垃圾收集的引用.完成此操作后,打印出test并将其显示为空,我抓起另一个堆转储,发现RES仍显示1.7g.我使用jhat heap0.bin分析原始堆转储,并得到:

Then I ran the first round of test <- collect(lines) at which point running top showed it using ~1.7g of RES memory. I grabbed another heap dump. Finally, I also tried test <- {} to get rid of references to allow garbage-collection. After doing this, and printing out test and showing it to be empty, I grabbed another heap dump and noticed RES still showed 1.7g. I used jhat heap0.bin to analyze the original heap dump, and got:

Heap Histogram

All Classes (excluding platform)

Class   Instance Count  Total Size
class [B    25126   14174163
class [C    19183   1576884
class [<other>  11841   1067424
class [Lscala.concurrent.forkjoin.ForkJoinTask; 16  1048832
class [I    1524    769384
...

运行收集后,我有:

Heap Histogram

All Classes (excluding platform)

Class   Instance Count  Total Size
class [C    2784858 579458804
class [B    27768   70519801
class java.lang.String  2782732 44523712
class [Ljava.lang.Object;   2567    22380840
class [I    1538    8460152
class [Lscala.concurrent.forkjoin.ForkJoinTask; 27  1769904

即使我取消了test,它仍然保持不变.这向我们显示了char []的2784858个实例,总大小为579MB,还有String的2782732实例,大概是那些char []保持在其上方.我一直遵循参考图,并得到类似

Even after I nulled out test, it remained about the same. This shows us 2784858 instances of char[], for a total size of 579MB, and also 2782732 instances of String, presumably holding those char[]'s above it. I followed the reference graph all the way up, and got something like

char []->字符串-> String []-> ...->类scala.collection.mutable.DefaultEntry->类[Lscala.collection.mutable.HashEntry; ->类scala.collection.mutable.HashMap->类edu.berkeley.cs.amplab.sparkr.JVMObjectTracker $-> java.util.Vector@0x785b48cd8(36字节)-> sun.misc.Launcher$AppClassLoader@0x7855c31a8( 138个字节)

char[] -> String -> String[] -> ... -> class scala.collection.mutable.DefaultEntry -> class [Lscala.collection.mutable.HashEntry; -> class scala.collection.mutable.HashMap -> class edu.berkeley.cs.amplab.sparkr.JVMObjectTracker$ -> java.util.Vector@0x785b48cd8 (36 bytes) -> sun.misc.Launcher$AppClassLoader@0x7855c31a8 (138 bytes)

然后,AppClassLoader具有类似数千个入站引用的内容.因此,沿着该链的某个位置应该删除了引用,但未能删除引用,从而导致整个收集的数组都位于内存中,而我们尝试获取它的第二个副本.

And then AppClassLoader had something like thousands of inbound references. So somewhere along that chain something should've been removing their reference but failing to do so, causing the entire collected array to sit in memory while we try to fetch a second copy of it.

最后,要回答关于collect后挂起的问题,看来这与R进程的内存中不适合的数据有关;这是与此问题相关的主题: https://www.mail -archive.com/user@spark.apache.org/msg29155.html

Finally, to answer your question about hanging after the collect, it appears it has to do with the data not fitting in the R process's memory; here's a thread related to that issue: https://www.mail-archive.com/user@spark.apache.org/msg29155.html

我确认使用只有几行的较小文件然后运行collect确实不会挂起.

I confirmed that using a smaller file with only a handful of lines, and then running collect indeed does not hang.

这篇关于SparkR收集方法因Java堆空间上的OutOfMemory而崩溃的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆