如果我缓存了一个Spark数据帧,然后覆盖了引用,是否仍会缓存原始数据帧? [英] If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?

查看:52
本文介绍了如果我缓存了一个Spark数据帧,然后覆盖了引用,是否仍会缓存原始数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个生成(py)spark数据帧的函数,作为最后一个操作将数据帧缓存到内存中.

Suppose I had a function to generate a (py)spark data frame, caching the data frame into memory as the last operation.

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
   return df

据我了解,Spark的缓存的工作方式如下:

Per my understanding, Spark's caching works as follows:

  1. 在数据上调用 cache/persist 加上操作( count())时帧,从其DAG计算得出并缓存到内存中,粘贴指向引用它的对象.
  2. 只要存在对该对象的引用(可能在其他功能/其他范围内),df将继续被缓存,并且依赖于df的所有DAG都将使用内存中缓存的数据作为起点.
  3. 如果所有对df的引用都被删除,Spark会将缓存作为要进行垃圾收集的内存.它可能不会立即被垃圾回收,从而导致一些短期内存块(尤其是如果生成缓存的数据并将它们扔得太快会导致内存泄漏),但最终它将被清除.
  1. When cache/persist plus an action (count()) is called on a data frame, it is computed from its DAG and cached into memory, affixed to the object which refers to it.
  2. As long as a reference exists to that object, possibly within other functions/other scopes, the df will continue to be cached, and all DAGs that depend on the df will use the in-memory cached data as a starting point.
  3. If all references to the df are deleted, Spark puts up the cache as memory to be garbage collected. It may not be garbage collected immediately, causing some short-term memory blocks (and in particular, memory leaks if you generate cached data and throw them away too fast), but eventually it will be cleared up.

我的问题是,假设我使用 gen_func 生成数据帧,但随后覆盖了原始数据帧引用(也许使用了 filter withColumn).

My question is, suppose I use gen_func to generate a data frame, but then overwrite the original data frame reference (perhaps with a filter or a withColumn).

df=gen_func(inputs)
df=df.filter("some_col = some_val")

在Spark中,RDD/DF是不可变的,因此在过滤器之后重新分配的df和在过滤器之前重新分配的df引用了两个完全不同的对象.在这种情况下,对 cache/counted 的原始df的引用已被覆盖.这是否意味着缓存的数据帧不再可用,将被垃圾回收?这是否意味着新的后过滤器 df 尽管从先前缓存的数据帧中生成,但仍将从头开始计算所有内容?

In Spark, RDD/DF are immutable, so the reassigned df after the filter and the df before the filter refer to two entirely different objects. In this case, the reference to the original df that was cache/counted has been overwritten. Does that mean that the cached data frame is no longer available and will be garbage collected? Does that mean that the new post-filter df will compute everything from scratch, despite being generated from a previously cached data frame?

之所以问这个问题,是因为我最近正在修复代码中的内存不足问题,而且在我看来,缓存可能是问题所在.但是,我还不太了解使用高速缓存的安全方法是什么的详细信息,以及如何意外使高速缓存的内存无效.我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?

I am asking this because I was recently fixing some out-of-memory issues with my code, and it seems to me that caching might be the problem. However, I do not really understand the full details yet of what are the safe ways to use cache, and how one might accidentally invalidate one's cached memory. What is missing in my understanding? Am I deviating from best practice in doing the above?

推荐答案

希望提出几点,以期阐明Spark在缓存方面的行为.

Wanted to make a couple of points to hopefully clarify Spark's behavior with respect to caching.

  1. 有一个

  1. When you have a

df = ... do stuff...
df.cache()
df.count()

...然后在您的应用程序中的其他地方

...and then somewhere else in your application

   another_df = ... do *same* stuff...
   another_df.*some_action()*

...,您可能希望 another_df 重用缓存的 df 数据帧.毕竟,重用先前计算的结果是缓存的目标.意识到这一点,Spark开发人员决定使用分析的逻辑计划作为密钥".识别缓存的数据帧,而不是仅依赖于应用程序一侧的引用.在Spark中,

..., you would expect another_df to reuse cached df dataframe. After all, reusing the result of a prior computation is the objective of caching. Realizing that, Spark developers made a decision to use analyzed logical plans as a "key" to identify cached dataframes, as opposed to relying on mere references from the application side. In Spark, CacheManager is the component keeping track of cached computations, in the indexed sequence cachedData:

  /**
   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list
   * should be protected in a "this.synchronized" block which includes the reading of the
   * existing value and the update of the cachedData var.
   */
  @transient @volatile
  private var cachedData = IndexedSeq[CachedData]()

在查询计划期间(在缓存管理器"阶段),将在此结构中扫描正在分析的计划的所有子树,以查看是否已计算出其中的任何子树.如果找到匹配项,Spark用来自 cachedData 的相应 InMemoryRelation 替换此子树.

During query planning (in Cache Manager phase), this structure is scanned for all subtrees of a plan being analysed, to see if any of them have already been computed. If a match is found, Spark substitutes this subtree with a corresponding InMemoryRelation from cachedData.

  1. cache()( persist()的简单同义词)函数通过调用 cacheQuery(...)
  1. cache() (a simple synonym for persist()) function stores the dataframes with storage level MEMORY_AND_DISK by calling cacheQuery(...) in CacheManager

      /**
       * Caches the data produced by the logical representation of the given [[Dataset]].
       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
       * recomputing the in-memory columnar representation of the underlying table is expensive.
       */
      def cacheQuery(...

请注意,这与使用 MEMORY_ONLY 级别的RDD缓存不同.一旦缓存的数据帧保持缓存在内存或本地执行器磁盘中,直到对其进行明确的 unpersist 'ed或调用CacheManager的 clearCache().当执行程序存储内存完全填满时,缓存的块开始使用LRU(最近最少使用)推入磁盘,但绝不会简单地丢弃".

Note that this is different from RDD caching which uses MEMORY_ONLY level. Once cached dataframes remain cached either in memory or on local executor disk until they are explicitly unpersist'ed, or the CacheManager's clearCache() is called. When executor storage memory fills up completely, cached blocks start being pushed to disk using LRU (least recently used) but never simply "dropped".

好的,顺便问一下...

Good question, by the way...

这篇关于如果我缓存了一个Spark数据帧,然后覆盖了引用,是否仍会缓存原始数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆