当rdd项很大时,为什么rdd.map(identity).cache变慢? [英] Why is rdd.map(identity).cache slow when rdd items are big?

查看:70
本文介绍了当rdd项很大时,为什么rdd.map(identity).cache变慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发现在rdd上使用.map( identity ).cache时,如果项目很大,它将变得非常慢.否则,它几乎是瞬时的.

注意:这可能与此问题有关,但是在此我提供了一个非常精确的示例(可以直接在其中执行火花壳):

 // simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count )                 // around 12 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!
 

我首先期望是时候创建一个新的rdd(容器)了.但是,如果我使用大小相同但内容很少的rdd,则执行时间只有很小的差异:

 val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count )                 // around 9 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 15 ms
 

因此,看起来缓存实际上是在复制数据.我以为它也可能会浪费时间进行序列化,但是我检查了缓存是否使用了默认的MEMORY_ONLY持久性:

 rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true
 

=>那么,是缓存复制数据,还是其他?

这实际上是我的应用程序的主要限制,因为我开始使用的设计与rdd = rdd.map(f: Item => Item).cache类似,可以与任意以任意顺序(我无法事先确定的顺序)应用的许多此类功能一起使用./p>

我正在使用Spark 1.6.0

编辑

当我查看spark ui->阶段选项卡->最后一个阶段(即4)时,所有任务的数据几乎都相同:

  • duration = 3s(下降到3s,但仍然是2.9 :: \\)
  • 计划程序10毫秒
  • 任务反序列化20ms
  • gc 0.1s(所有任务都有,但是为什么会触发gc ??)
  • 结果序列化0ms
  • 获取结果0ms
  • peak exec mem 0.0B
  • 输入大小7.0MB/125
  • 没有错误

解决方案

在慢速缓存中运行org.apache.spark.executor.CoarseGrainedExecutorBackend的进程的jstack显示以下内容:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

SizeEstimator 是缓存表面上已经存在于内存中的东西的主要成本之一,因为对未知对象的正确大小估计可能相当困难.如果您查看 visitSingleObject 方法,您可以看到它很大程度上依赖于反射,调用getClassInfo可以访问运行时类型信息;不仅遍历了完整的对象层次结构,而且每个嵌套成员都根据IdentityHashMap进行检查以检测哪些引用引用了相同的具体对象实例,因此堆栈跟踪在这些IdentityHashMap操作中显示了大量时间.

对于示例对象,基本上每个项目都是从包装的整数到包装的整数的映射列表;大概Scala的内部映射实现也包含一个数组,这解释了visitSingleObject-> List.foreach-> visitSingleObject-> visitSingleObject调用层次结构.无论如何,在这种情况下,都有许多内部对象要访问,并且SizeEstimators为每个采样的对象设置一个新的IdentityHashMap.

在进行测量的情况下:

profile( rdd.cache.count )

这不算是执行缓存逻辑,因为RDD已成功缓存,因此Spark很聪明,不会重新运行缓存逻辑.实际上,通过对新的RDD创建进行概要分析并直接进行缓存,可以独立于额外的"map(identity)"转换而隔离出缓存逻辑的确切成本.这是我的Spark会话,从最后几行继续:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms                                                                   
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms                                                                  
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms                                                                  
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms                                                                  
res7: Long = 1000

因此,您可以看到,运行速度缓慢并不是由于您本身进行了map转换,而是在这种情况下,〜6s似乎是计算1000缓存逻辑的基本成本当每个对象都有大约1,000,000到〜10,000,000个内部对象时(取决于Map实现的布局方式;顶部堆栈跟踪中多余的visitArray嵌套提示HashMap impl具有嵌套数组,这对于一个对象是有意义的)每个哈希表条目内部的典型密集线性探测数据结构.

对于您的具体用例,如果可能的话,您应该在惰性缓存方面犯错,因为如果您不打算将中间结果用于许多单独的缓存,则与缓存中间结果相关的开销不是一个很好的折衷方案.下游转型.但是正如您在问题中提到的那样,如果您确实在使用一个RDD分支到多个不同的下游转换中,那么,如果原始转换非常昂贵,则可能确实需要缓存步骤.

解决方法是尝试拥有更适合恒定时间计算的内部数据结构(例如,原语数组),在此您可以节省 lot 以避免重复计算大量数字的成本包装对象,并取决于它们在SizeEstimator中的反射.

我尝试了诸如Array [Array [Int]]之类的方法,即使开销仍然为非零,对于类似的数据大小,它还是要好10倍:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms                                                                    
res20: Long = 1000

为了说明在任何奇特的物体上反射的成本有多么糟糕,如果我删除那里的最后一个toArray并最终使每个bigContent成为scala.collection.immutable.IndexedSeq[Array[Int]],则性能回落到大约2倍以内IndexSeq[Map[Int,Int]]原始案例的慢度:

scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms                                                                   
res25: Long = 1000

如评论部分所述,您也可以考虑使用MEMORY_ONLY_SER StorageLevel,只要有一个有效的序列化程序,它就可能比SizeEstimator中使用的递归反射便宜;为此,您只需将cache()替换为persist(StorageLevel.MEMORY_ONLY_SER);如其他问题 中所述从概念上讲与persist(StorageLevel.MEMORY_ONLY)相同.

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )

我实际上在运行Spark 1.6.1和Spark 2.0.0预览版时都尝试了此操作,其他所有与群集配置完全相同的操作(使用 Google Cloud Dataproc 的"1.0"和预览"图像版本).不幸的是,MEMORY_ONLY_SER技巧在Spark 1.6.1中似乎没有帮助:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms                                                                   
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms                                                                   
res21: Long = 1000

但是在Spark 2.0.0-preview中,它似乎将性能提高了10倍:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms                                                                   
res20: Long = 1000

这可能取决于您的对象;仅在序列化本身不使用大量反射的情况下,才有望提速.如果您能够有效地使用 Kryo序列化,那么您很可能会看到对这些大对象使用MEMORY_ONLY_SER进行了改进.

I found out that when using .map( identity ).cache on a rdd, it become very slow if the items are big. While it is pretty much instantaneous otherwise.

Note: this is probably related to this question, but here I provide a very precise example (that can be executed directly in spark-shell):

// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count )                 // around 12 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!

I first expected that it was the time to create a new rdd (container). But if I use a rdd with same size but little content, there is only a tiny difference in execution time:

val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count )                 // around 9 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 15 ms

So, it looks like caching is actually copying the data. I thought it might also lose time serializing it, but I checked that cache is used with default MEMORY_ONLY persistence:

rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true

=> So, is caching copying data, or is it something else?

This is really a major limitation for my application because I started with a design that use something similar to rdd = rdd.map(f: Item => Item).cache that can be used with many such functions f applied in arbitrary order (order that I cannot determine before hand).

I am using Spark 1.6.0

Edit

When I look at the spark ui -> stage tab -> the last stage (i.e. 4), all tasks have pretty much the same data with:

  • duration = 3s (it went down to 3s, but that's still 2.9 too much :-\ )
  • scheduler 10ms
  • task deserialization 20ms
  • gc 0.1s (all tasks have that, but why would gc be triggered???)
  • result serialization 0ms
  • getting result 0ms
  • peak exec mem 0.0B
  • input size 7.0MB/125
  • no errors

解决方案

A jstack of the process running the org.apache.spark.executor.CoarseGrainedExecutorBackend during the slow caching reveals the following:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

The SizeEstimator makes sense as one of the main costs of caching something which is ostensibly already in memory, since proper size estimation for unknown objects can be fairly difficult; if you look in the visitSingleObject method, you can see it heavily relies on reflection, calling getClassInfo which accesses runtime type information; not only does the full object hierarchy get traversed, but each nested member gets checked against an IdentityHashMap to detect which references refer to the same concrete object instance, and thus the stack traces show lots of time in those IdentityHashMap operations.

In the case of your example objects, you basically have each item as a list of maps from wrapped integers to wrapped integers; presumably Scala's implementation of the inner map holds an array as well, which explains the visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject call hierarchy. In any case, there are lots of inner objects to visit in this case, and the SizeEstimators set up a fresh IdentityHashMap for each object sampled.

In the case where you measure:

profile( rdd.cache.count )

this doesn't count as exercising the caching logic since the RDD has already been successfully cached, so Spark is smart enough not to re-run the caching logic. You can actually isolate out the exact cost of the caching logic independently of the extra "map(identity)" transformation by profiling your fresh RDD creation and caching directly; here's my Spark session continuing from your last few lines:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms                                                                   
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms                                                                  
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms                                                                  
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms                                                                  
res7: Long = 1000

So you can see, the slowness didn't come from the fact that you ran through a map transformation, per se, but rather in this case the ~6s appears to be the fundamental cost of calculating caching logic for 1000 objects when each object has something like ~1,000,000 to ~10,000,000 inner objects (depending on how the Map implementation is layed out; ex extra visitArray nesting in the top stack trace hints that the HashMap impl has nested arrays, which makes sense for a typical dense linear-probing data structure inside each hashtable entry).

For your concrete use case, you should err on the side of lazy caching if possible, since there's overhead associated with caching intermediate results that's not a good tradeoff if you're not really going to reuse the intermediate results for lots of separate downstream transformations. But as you mention in your question, if you're indeed using one RDD to branch out into multiple different downstream transformations, you might indeed need the caching step if the original transformations are at all expensive.

The workaround is to try to have inner data structures which are more amenable to constant-time calculations (e.g. arrays of primitives), where you can save a lot of cost on avoiding iterating over huge numbers of wrapper objects and depending on reflection for them in the SizeEstimator.

I tried things like Array[Array[Int]] and even though there's still nonzero overhead, it's 10x better for a similar data size:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms                                                                    
res20: Long = 1000

To illustrate just how bad the cost of reflection on any fancier objects is, if I remove the last toArray there and end up with each bigContent being a scala.collection.immutable.IndexedSeq[Array[Int]], the performance goes back to being within ~2x the slowness of the original IndexSeq[Map[Int,Int]] case:

scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms                                                                   
res25: Long = 1000

As discussed in the comment section, you can also consider using the MEMORY_ONLY_SER StorageLevel, where as long as there's an efficient serializer, it can quite possibly be cheaper than the recursive reflection used in SizeEstimator; to do that you'd just replace cache() with persist(StorageLevel.MEMORY_ONLY_SER); as mentioned in this other question, cache() is conceptually the same thing as persist(StorageLevel.MEMORY_ONLY).

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )

I actually tried this on both Spark 1.6.1 and Spark 2.0.0-preview running with everything else about the cluster configuration exactly the same (using Google Cloud Dataproc's "1.0" and "preview" image-versions, respectively). Unfortunately the MEMORY_ONLY_SER trick didn't appear to help in Spark 1.6.1:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms                                                                   
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms                                                                   
res21: Long = 1000

But in Spark 2.0.0-preview it seemed to improve performance by 10x:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms                                                                   
res20: Long = 1000

This could vary depending on your objects though; speedup would only be expected if serialization itself isn't using tons of reflection anyway; if you're able to effectively use the Kryo serialization then it's likely you can see improvement using MEMORY_ONLY_SER for these large objects.

这篇关于当rdd项很大时,为什么rdd.map(identity).cache变慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆