Spark + Scala 转换、不变性和内存消耗开销 [英] Spark + Scala transformations, immutability & memory consumption overheads

查看:23
本文介绍了Spark + Scala 转换、不变性和内存消耗开销的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Youtube 中浏览了一些关于

这就是懒惰的胜利.至于 Spark v Hadoop,已经有很多(只是谷歌它),但要点是 Spark 倾向于开箱即用地利用网络带宽,在那里给它一个提升.然后,通过懒惰获得了许多性能改进,特别是如果架构已知并且您可以使用 DataFrames API.

所以,总的来说,Spark 在几乎所有方面都击败了 MR.

I have gone through some videos in Youtube regarding Spark architecture.

Even though Lazy evaluation, Resilience of data creation in case of failures, good functional programming concepts are reasons for success of Resilenace Distributed Datasets, one worrying factor is memory overhead due to multiple transformations resulting into memory overheads due data immutability.

If I understand the concept correctly, Every transformations is creating new data sets and hence the memory requirements will gone by those many times. If I use 10 transformations in my code, 10 sets of data sets will be created and my memory consumption will increase by 10 folds.

e.g.

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

Above example has three transformations : flatMap, map and reduceByKey. Does it implies I need 3X memory of data for X size of data?

Is my understanding correct? Is caching RDD is only solution to address this issue?

Once I start caching, it may spill over to disk due to large size and performance would be impacted due to disk IO operations. In that case, performance of Hadoop and Spark are comparable?

EDIT:

From the answer and comments, I have understood lazy initialization and pipeline process. My assumption of 3 X memory where X is initial RDD size is not accurate.

But is it possible to cache 1 X RDD in memory and update it over the pipleline? How does cache () works?

解决方案

First off, the lazy execution means that functional composition can occur:

scala> val rdd = sc.makeRDD(List("This is a test", "This is another test", 
                                 "And yet another test"), 1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at makeRDD at <console>:27

scala> val counts = rdd.flatMap(line => {println(line);line.split(" ")}).
     | map(word => {println(word);(word,1)}).
     | reduceByKey((x,y) => {println(s"$x+$y");x+y}).
     | collect
This is a test
This
is
a
test
This is another test
This
1+1
is
1+1
another
test
1+1
And yet another test
And
yet
another
1+1
test
2+1
counts: Array[(String, Int)] = Array((And,1), (is,2), (another,2), (a,1), (This,2), (yet,1), (test,3))

First note that I force the parallelism down to 1 so that we can see how this looks on a single worker. Then I add a println to each of the transformations so that we can see how the workflow moves. You see that it processes the line, then it processes the output of that line, followed by the reduction. So, there are not separate states stored for each transformation as you suggested. Instead, each piece of data is looped through the entire transformation up until a shuffle is needed, as can be seen by the DAG visualization from the UI:

That is the win from the laziness. As to Spark v Hadoop, there is already a lot out there (just google it), but the gist is that Spark tends to utilize network bandwidth out of the box, giving it a boost right there. Then, there a number of performance improvements gained by laziness, especially if a schema is known and you can utilize the DataFrames API.

So, overall, Spark beats MR hands down in just about every regard.

这篇关于Spark + Scala 转换、不变性和内存消耗开销的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆