sc.textfile与sc.wholeTextFiles + flatMapValues的内存使用情况 [英] Memory Usage of sc.textfile vs sc.wholeTextFiles + flatMapValues

查看:110
本文介绍了sc.textfile与sc.wholeTextFiles + flatMapValues的内存使用情况的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组要读取到RDD中的日志文件.这些日志文件都是压缩的gzip文件,文件名是日期戳.

I have a set of log files I would like to read into an RDD. These log files are all compressed gzip files and the filenames are date stamped.

我一直在使用 sc.wholeTextFiles()读取文件,似乎我已经遇到了Java堆内存问题.为了找出问题所在,我决定在单台计算机上针对一个文件运行它作为测试用例.

I've been using sc.wholeTextFiles() to read in the files and it seems like I've been running into Java heap memory issues. To isolate the problem, I decided to run it against a single file on a single machine as a test case.

我从这里获得了文件:

http://dumps.wikimedia.org/other/pagecounts-raw/

以下是文件的大小,压缩和未压缩版本:

Here are the sizes of the file, both compressed and uncompressed versions:

 myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
 -rw-rw-r-- 1 myuser myuser  65170192 Sep 20  2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt

,机器上的可用内存如下:

and the available memory on the machine is as follows:

myuser@fembuntu:~$ free -tm

       total       used       free     shared    buffers     cached
Mem:    4856       3018       1838        123         27        407
-/+ buffers/cache: 2583       2273
 Swap:  5080        849       4231
Total:  9937       3867       6069

因此,我启动了spark-shell,为执行程序提供了2G的内存:

So I fired up the spark-shell, giving the executor 2G of memory:

$ spark-shell --executor-memory 2G

scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz

在这里,我通过 sc.textFile()读取数据并显示第1行,第2行:

Here I read in the data via sc.textFile() and display the 1st 2 lines:

scala>  var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31

scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)

那很好.

这里,我使用 sc.wholeTextFiles(),并通过 flatMapValues()在新行上进行拆分,以获得对RDD,其中行为键值对.这些值对应于使用 sc.textFile()获得的RDD中的行.关键是文件路径.

Here I use sc.wholeTextFiles(), and split on the new line via flatMapValues() to obtain a pair RDD that with rows being key-value pairs. The values correspond to the rows in the RDD obtained by using sc.textFile(). The key would be the filepath.

scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31

但是执行动作时出现堆错误:

But I get a heap error when I execute an action:

scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)

任何人都可以解释这里发生了什么吗?为什么 flatMapValues 调用来拆分行似乎会使Java堆内存使用量大打折扣,从而导致堆错误?

Can anyone explain what's going in here ? Why does the flatMapValues call to split the lines seem to blow Java heap memory usage out of the water resulting in a heap error ?

推荐答案

您遇到的问题并非真正针对带有 flatMapValues的 textFile wholeTextFiles 代码>场景.看来您的程序甚至还没有达到数据扁平化的程度,而且我很确定当您调用 count 而不是 mapValues 时,您也会遇到相同的异常.

The problem you experience is not really specific to textFile vs wholeTextFiles with flatMapValues scenario. It looks like your program doesn't even get to the point where data is flattened and I am pretty sure you'll get the same exception when you call count instead of mapValues.

实际上只是创建大对象的问题.请记住, wholeTextFiles 必须立即读取文件的完整内容,不能将其部分溢出到磁盘或部分垃圾回收.虽然200MB左右并不特别令人印象深刻,但是单个对象可以处理很多东西.而且,它必须驻留在单台计算机上,这意味着分配负载要困难得多.

In practice is just a matter of creating large objects. Remember that wholeTextFiles has to read complete content of the file at once, it cannot be partially spilled to disk or partially garbage collected. While 200MB or so it is not particularly impressive but it is quite a lot to be handled by a single object. Moreover it has to reside on a single machine which means that is much harder to distribute the load.

wholeTextFiles 不同,在这种特殊情况下, textFile 提供了更高的粒度.单个对象必须处理的数据要少得多,并且如果不再需要,则可以有效地进行垃圾回收.

Unlike wholeTextFiles, textFile provides much higher granularity in this particular case. Individual objects had to handle significantly less data and can be efficiently garbage collected if no longer required.

忽略对象的大小,就像您在本地模式下使用Spark一样.这意味着一切都由单个JVM处理.由于堆是所有线程共享的,这意味着可用于实际处理的内存量可能会比您预期的要低.

Ignoring size of the objects it looks like you're using Spark in a local mode. It means everything is handled by a single JVM. Since heap is shared by all threads it means that amount of memory available for actual processing can be lower than you expect.

最后,您应该记住,只有一部分可用内存保留给堆.请参见垃圾收集器的人机工程学如何确定默认的Java堆大小?.如果必须处理大对象,则可以始终使用 -Xms / -Xmx Java选项覆盖默认的初始堆大小和最大堆大小.

Finally you should remember that only a part of the available memory is reserved for heap. See Garbage Collector Ergonomics and How is the default java heap size determined?. If you have to process large objects you can always override default initial and maximum heap size using -Xms / -Xmx Java options.

这篇关于sc.textfile与sc.wholeTextFiles + flatMapValues的内存使用情况的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆