sc.textfile与sc.wholeTextFiles + flatMapValues的内存使用情况 [英] Memory Usage of sc.textfile vs sc.wholeTextFiles + flatMapValues
问题描述
我有一组要读取到RDD中的日志文件.这些日志文件都是压缩的gzip文件,文件名是日期戳.
I have a set of log files I would like to read into an RDD. These log files are all compressed gzip files and the filenames are date stamped.
我一直在使用 sc.wholeTextFiles()
读取文件,似乎我已经遇到了Java堆内存问题.为了找出问题所在,我决定在单台计算机上针对一个文件运行它作为测试用例.
I've been using sc.wholeTextFiles()
to read in the files and it seems like I've been running into Java heap memory issues. To isolate the problem, I decided to run it against a single file on a single machine as a test case.
我从这里获得了文件:
http://dumps.wikimedia.org/other/pagecounts-raw/
以下是文件的大小,压缩和未压缩版本:
Here are the sizes of the file, both compressed and uncompressed versions:
myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
-rw-rw-r-- 1 myuser myuser 65170192 Sep 20 2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt
,机器上的可用内存如下:
and the available memory on the machine is as follows:
myuser@fembuntu:~$ free -tm
total used free shared buffers cached
Mem: 4856 3018 1838 123 27 407
-/+ buffers/cache: 2583 2273
Swap: 5080 849 4231
Total: 9937 3867 6069
因此,我启动了spark-shell,为执行程序提供了2G的内存:
So I fired up the spark-shell, giving the executor 2G of memory:
$ spark-shell --executor-memory 2G
scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz
在这里,我通过 sc.textFile()
读取数据并显示第1行,第2行:
Here I read in the data via sc.textFile()
and display the 1st 2 lines:
scala> var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31
scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)
那很好.
这里,我使用 sc.wholeTextFiles()
,并通过 flatMapValues()
在新行上进行拆分,以获得对RDD,其中行为键值对.这些值对应于使用 sc.textFile()
获得的RDD中的行.关键是文件路径.
Here I use sc.wholeTextFiles()
, and split on the new line via flatMapValues()
to obtain a pair RDD that with rows being key-value pairs. The values correspond to the rows in the RDD obtained by using sc.textFile()
. The key would be the filepath.
scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31
但是执行动作时出现堆错误:
But I get a heap error when I execute an action:
scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)
任何人都可以解释这里发生了什么吗?为什么 flatMapValues
调用来拆分行似乎会使Java堆内存使用量大打折扣,从而导致堆错误?
Can anyone explain what's going in here ? Why does the flatMapValues
call to split the lines seem to blow Java heap memory usage out of the water resulting in a heap error ?
推荐答案
您遇到的问题并非真正针对带有 flatMapValues的
textFile
与 wholeTextFiles
代码>场景.看来您的程序甚至还没有达到数据扁平化的程度,而且我很确定当您调用 count
而不是 mapValues
时,您也会遇到相同的异常.
The problem you experience is not really specific to textFile
vs wholeTextFiles
with flatMapValues
scenario. It looks like your program doesn't even get to the point where data is flattened and I am pretty sure you'll get the same exception when you call count
instead of mapValues
.
实际上只是创建大对象的问题.请记住, wholeTextFiles
必须立即读取文件的完整内容,不能将其部分溢出到磁盘或部分垃圾回收.虽然200MB左右并不特别令人印象深刻,但是单个对象可以处理很多东西.而且,它必须驻留在单台计算机上,这意味着分配负载要困难得多.
In practice is just a matter of creating large objects. Remember that wholeTextFiles
has to read complete content of the file at once, it cannot be partially spilled to disk or partially garbage collected. While 200MB or so it is not particularly impressive but it is quite a lot to be handled by a single object. Moreover it has to reside on a single machine which means that is much harder to distribute the load.
与 wholeTextFiles
不同,在这种特殊情况下, textFile
提供了更高的粒度.单个对象必须处理的数据要少得多,并且如果不再需要,则可以有效地进行垃圾回收.
Unlike wholeTextFiles
, textFile
provides much higher granularity in this particular case. Individual objects had to handle significantly less data and can be efficiently garbage collected if no longer required.
忽略对象的大小,就像您在本地模式下使用Spark一样.这意味着一切都由单个JVM处理.由于堆是所有线程共享的,这意味着可用于实际处理的内存量可能会比您预期的要低.
Ignoring size of the objects it looks like you're using Spark in a local mode. It means everything is handled by a single JVM. Since heap is shared by all threads it means that amount of memory available for actual processing can be lower than you expect.
最后,您应该记住,只有一部分可用内存保留给堆.请参见垃圾收集器的人机工程学和如何确定默认的Java堆大小?.如果必须处理大对象,则可以始终使用 -Xms
/ -Xmx
Java选项覆盖默认的初始堆大小和最大堆大小.
Finally you should remember that only a part of the available memory is reserved for heap. See Garbage Collector Ergonomics and How is the default java heap size determined?. If you have to process large objects you can always override default initial and maximum heap size using -Xms
/ -Xmx
Java options.
这篇关于sc.textfile与sc.wholeTextFiles + flatMapValues的内存使用情况的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!