如果我缓存一个 Spark 数据帧然后覆盖引用,原始数据帧还会被缓存吗? [英] If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?

查看:34
本文介绍了如果我缓存一个 Spark 数据帧然后覆盖引用,原始数据帧还会被缓存吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个函数来生成一个 (py)spark 数据帧,将数据帧缓存到内存中作为最后一个操作.

def gen_func(inputs):df = ... 做事...df.cache()df.count()返回 df

根据我的理解,Spark 的缓存工作如下:

  1. 当在数据上调用 cache/persist 加上一个动作 (count()) 时帧,它是从它的 DAG 计算出来的,并缓存到内存中,贴上指向引用它的对象.
  2. 只要存在对该对象的引用,可能在其他函数/其他范围内,df 将继续被缓存,并且所有依赖于 df 的 DAG 将使用内存中缓存的数据作为起点.
  3. 如果对 df 的所有引用都被删除,Spark 会将缓存作为内存进行垃圾回收.它可能不会立即被垃圾回收,导致一些短期内存块(特别是如果生成缓存数据并过快丢弃它们会导致内存泄漏),但最终会被清除.

我的问题是,假设我使用 gen_func 生成数据框,然后覆盖原始数据框引用(可能使用 filterwithColumn).

df=gen_func(inputs)df=df.filter(some_col = some_val")

在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象.在这种情况下,对 cache/counted 的原始 df 的引用已被覆盖.这是否意味着缓存的数据帧不再可用并将被垃圾收集?这是否意味着新的后置过滤器 df 将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

我之所以这么问是因为我最近正在修复代码中的一些内存不足问题,在我看来,缓存可能是问题所在.然而,我还没有真正理解什么是使用缓存的安全方法的全部细节,以及人们如何可能不小心使自己的缓存内存无效.我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?

解决方案

我做了几个实验,如下所示.显然,数据帧一旦缓存,保持缓存(如getPersistentRDDs 和查询计划 - InMemory 等所示),即使所有 Python引用被使用 del 覆盖或完全删除,并显式调用垃圾收集.

实验 1:

def func():data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')数据缓存()数据计数()返回数据sc._jsc.getPersistentRDDs()df = 函数()sc._jsc.getPersistentRDDs()df2 = df.filter('col1 != 2')删除进口GCgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()df2.select('*').explain()删除 df2gc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()

结果:

<预><代码>>>>定义函数():... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')... data.cache()... data.count()...返回数据...>>>sc._jsc.getPersistentRDDs(){}>>>df = 函数()>>>sc._jsc.getPersistentRDDs(){71: JavaObject id=o234}>>>df2 = df.filter('col1 != 2')>>>删除>>>进口GC>>>gc.collect()93>>>sc._jvm.System.gc()>>>sc._jsc.getPersistentRDDs(){71: JavaObject id=o240}>>>df2.select('*').explain()== 物理计划 ==*(1) 过滤器 (isnotnull(col1#174L) AND NOT (col1#174L = 2))+- *(1) ColumnarToRow+- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]+- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)+- *(1) 项目 [_1#172L AS col1#174L]+- *(1) 扫描现有RDD[_1#172L]>>>删除 df2>>>gc.collect()85>>>sc._jvm.System.gc()>>>sc._jsc.getPersistentRDDs(){71: JavaObject id=o250}

实验 2:

def func():data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')数据缓存()数据计数()返回数据sc._jsc.getPersistentRDDs()df = 函数()sc._jsc.getPersistentRDDs()df = df.filter('col1 != 2')进口GCgc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()df.select('*').explain()删除gc.collect()sc._jvm.System.gc()sc._jsc.getPersistentRDDs()

结果:

<预><代码>>>>定义函数():... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')... data.cache()... data.count()...返回数据...>>>sc._jsc.getPersistentRDDs(){}>>>df = 函数()>>>sc._jsc.getPersistentRDDs(){86: JavaObject id=o317}>>>df = df.filter('col1 != 2')>>>进口GC>>>gc.collect()244>>>sc._jvm.System.gc()>>>sc._jsc.getPersistentRDDs(){86: JavaObject id=o323}>>>df.select('*').explain()== 物理计划 ==*(1) 过滤器 (isnotnull(col1#220L) AND NOT (col1#220L = 2))+- *(1) ColumnarToRow+- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]+- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)+- *(1) 项目 [_1#218L AS col1#220L]+- *(1) 扫描现有RDD[_1#218L]>>>删除>>>gc.collect()85>>>sc._jvm.System.gc()>>>sc._jsc.getPersistentRDDs(){86: JavaObject id=o333}

实验 3(对照实验,以证明 unpersist 有效)

def func():data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')数据缓存()数据计数()返回数据sc._jsc.getPersistentRDDs()df = 函数()sc._jsc.getPersistentRDDs()df2 = df.filter('col1 != 2')df2.select('*').explain()df.unpersist()df2.select('*').explain()

结果:

<预><代码>>>>定义函数():... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')... data.cache()... data.count()...返回数据...>>>sc._jsc.getPersistentRDDs(){}>>>df = 函数()>>>sc._jsc.getPersistentRDDs(){116: JavaObject id=o398}>>>df2 = df.filter('col1 != 2')>>>df2.select('*').explain()== 物理计划 ==*(1) 过滤器 (isnotnull(col1#312L) AND NOT (col1#312L = 2))+- *(1) ColumnarToRow+- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]+- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)+- *(1) 项目 [_1#310L AS col1#312L]+- *(1) 扫描现有RDD[_1#310L]>>>df.unpersist()数据帧[col1: bigint]>>>sc._jsc.getPersistentRDDs(){}>>>df2.select('*').explain()== 物理计划 ==*(1) 项目[_1#310L AS col1#312L]+- *(1) 过滤器 (isnotnull(_1#310L) AND NOT (_1#310L = 2))+- *(1) 扫描现有RDD[_1#310L]

回答 OP 的问题:

<块引用>

这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器 df 将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

实验表明两者都.数据帧保持缓存状态,不会被垃圾回收,新的数据帧是根据查询计划使用缓存的(不可引用的)数据帧计算的.

一些与缓存使用相关的有用功能(如果您不想通过 Spark UI 执行)是:

sc._jsc.getPersistentRDDs(),显示缓存的 RDD/数据帧列表,以及

spark.catalog.clearCache(),清除所有缓存的 RDD/数据帧.

<块引用>

我在执行上述操作时是否偏离了最佳实践?

我无权就此评判您,但正如建议的评论之一,避免重新分配给 df,因为数据帧是不可变的.试着想象你正在用 Scala 编码,并且你将 df 定义为一个 val.做 df = df.filter(...) 是不可能的.Python 本身无法强制执行此操作,但我认为最佳做法是避免覆盖任何数据帧变量,以便在不再需要缓存结果时始终可以调用 df.unpersist()没有了.

Suppose I had a function to generate a (py)spark data frame, caching the data frame into memory as the last operation.

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
   return df

Per my understanding, Spark's caching works as follows:

  1. When cache/persist plus an action (count()) is called on a data frame, it is computed from its DAG and cached into memory, affixed to the object which refers to it.
  2. As long as a reference exists to that object, possibly within other functions/other scopes, the df will continue to be cached, and all DAGs that depend on the df will use the in-memory cached data as a starting point.
  3. If all references to the df are deleted, Spark puts up the cache as memory to be garbage collected. It may not be garbage collected immediately, causing some short-term memory blocks (and in particular, memory leaks if you generate cached data and throw them away too fast), but eventually it will be cleared up.

My question is, suppose I use gen_func to generate a data frame, but then overwrite the original data frame reference (perhaps with a filter or a withColumn).

df=gen_func(inputs)
df=df.filter("some_col = some_val")

In Spark, RDD/DF are immutable, so the reassigned df after the filter and the df before the filter refer to two entirely different objects. In this case, the reference to the original df that was cache/counted has been overwritten. Does that mean that the cached data frame is no longer available and will be garbage collected? Does that mean that the new post-filter df will compute everything from scratch, despite being generated from a previously cached data frame?

I am asking this because I was recently fixing some out-of-memory issues with my code, and it seems to me that caching might be the problem. However, I do not really understand the full details yet of what are the safe ways to use cache, and how one might accidentally invalidate one's cached memory. What is missing in my understanding? Am I deviating from best practice in doing the above?

解决方案

I've done a couple of experiments as shown below. Apparently, the dataframe, once cached, remains cached (as shown in getPersistentRDDs and the query plan - InMemory etc.), even if all Python reference were overwritten or deleted altogether using del, and with garbage collection explicitly called.

Experiment 1:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df2.select('*').explain()

del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

Results:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}

>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#172L AS col1#174L]
                  +- *(1) Scan ExistingRDD[_1#172L]

>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}

Experiment 2:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df.select('*').explain()

del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

Results:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}

>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}

>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#218L AS col1#220L]
                  +- *(1) Scan ExistingRDD[_1#218L]

>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}

Experiment 3 (control experiment, to show that unpersist works)

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
df2.select('*').explain()

df.unpersist()
df2.select('*').explain()

Results:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}

>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#310L AS col1#312L]
                  +- *(1) Scan ExistingRDD[_1#310L]

>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
   +- *(1) Scan ExistingRDD[_1#310L]

To answer the OP's question:

Does that mean that the cached data frame is no longer available and will be garbage collected? Does that mean that the new post-filter df will compute everything from scratch, despite being generated from a previously cached data frame?

The experiments suggest no for both. The dataframe remains cached, is not garbage collected, and the new dataframe is computed using the cached (unreference-able) dataframe, according to the query plan.

Some helpful functions related to cache usage (if you don't want to do it through the Spark UI) are:

sc._jsc.getPersistentRDDs(), which shows a list of cached RDDs/dataframes, and

spark.catalog.clearCache(), which clears all cached RDDs/dataframes.

Am I deviating from best practice in doing the above?

I am no authority to judge you on this, but as one of the comments suggested, avoid reassigning to df because dataframes are immutable. Try to imagine you're coding in scala and you defined df as a val. Doing df = df.filter(...) is impossible. Python can't enforce that per se, but I think the best practice is to avoid overwriting any dataframe variables, so that you can always call df.unpersist() afterwards if you no longer need the cached results anymore.

这篇关于如果我缓存一个 Spark 数据帧然后覆盖引用,原始数据帧还会被缓存吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆