spark中的cache()是否会更改RDD的状态或创建一个新的RDD? [英] Does cache() in spark change the state of the RDD or create a new one?

查看:181
本文介绍了spark中的cache()是否会更改RDD的状态或创建一个新的RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此问题是我之前有如果我在Spark中缓存两次RDD会发生什么情况

调用 cache()在RDD上,RDD的状态是否已更改(为了方便使用,返回的RDD只是 this )或创建了新的RDD,现有代码?

When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one?

下面的代码会发生什么:

What will happen in the following code:

// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();

// Case 1, will 'a' be calculated twice in this case 
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);

// Case 2, will the data of the calculation of 'a' 
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);


推荐答案


调用缓存时( )在RDD上,RDD的状态是否发生了变化(返回的RDD只是
,这是为了易于使用),还是创建了一个新的RDD
并包装了现有的RDD

When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one

返回相同的 RDD

/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
}
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

缓存不会引起任何副作用到上述RDD。如果已经将其标记为持久,则不会发生任何事情。如果不是,唯一的副作用是将其注册到 SparkContext ,而副作用不在 RDD 本身,但上下文。

Caching doesn't cause any side effect to the said RDD. If it's already marked for persistence, nothing will happen. If it isn't, the only side effect would be registering it to the SparkContext, where the side effect isn't on the RDD itself, but the context.

编辑:

查找在 JavaRDD.cache ,看来基础调用将导致分配另一个 JavaRDD

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())

wrapRDD 调用 JavaRDD.fromRDD 的地方:

object JavaRDD {

  implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
  implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}

这将导致分配新的 JavaRDD 。也就是说, RDD [T] 的内部实例将保持不变。

Which will cause the allocation of a new JavaRDD. That said, the internal instance of RDD[T] will remain the same.

这篇关于spark中的cache()是否会更改RDD的状态或创建一个新的RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆