如何检查由于Pyspark中的延迟执行而导致数据是否已缓存在数据帧中或尚未缓存? [英] How to check if data is cached in dataframe or not yet cached due to lazy execution in Pyspark?
问题描述
我的问题与我在堆栈溢出时可以找到的其他问题几乎没有什么不同.我需要知道数据是否已经被检索并存储在数据框中,或者是否尚未发生
My question is little different from other question I could find on stack overflow. I need to know if the data is already retrieved and stored in a dataframe or if that is yet to happen
我正在做这样的事情
df1=spark.table("sourceDB.Table1")
df1.cache()
现在,您可能已经知道,由于延迟执行,尚未从源表中读取数据.因此,我需要在此处使用一个表达式将结果表示为"False".此时.
Now, as you might be aware, data is not read yet from the source table due to lazy execution. So I need to have an expression here that says the result as "False" at this point.
一段时间后,我正在执行一些操作,要求从源中检索数据.例如.
After sometime, I am doing some operation that requires data to be retrieved from source. For example.
df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()
此时,必须已读取数据并将其存储在df1的高速缓存中.因此,我需要在此处使用一个表达式将结果表示为"True".此时.
At this point, data must have been read and stored in cache for df1. So I need to have an expression here that says the result as "True" at this point.
无论如何,我们可以实现这一目标吗?我相信df1.is_cached在这种情况下无济于事
Is there anyway we can achieve this? I believe df1.is_cached will not help in this situation
推荐答案
也许这很有用
1.如果要检查数据框中是否已经触发了 cache/persist
,则可以使用cachemanager
确认是否如下所示-
1. If you wanted to check whether the cache/persist
is already triggered on the dataframe then you can use cachemanager
to confirm that as below-
spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty
2.如果您想检查内存中是否有数据,也许下面的方法会有所帮助-
def checkIfDataIsInMemory(df: DataFrame): Boolean = {
val manager = df.sparkSession.sharedState.cacheManager
// step 1 - check if the dataframe.cache is issued earlier or not
if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
println("Cache statement is already issued on this dataframe")
// step-2 check if the data is in memory or not
val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
} else false
}
3.测试以上方法-
val df = spark.read
.parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
".parquet").getPath)
println(checkIfDataIsInMemory(df))
/**
* false
*/
df.cache()
// check if the data is cached
println(checkIfDataIsInMemory(df))
/**
* Cache statement is already issued on this dataframe
* false
*/
println(df.count())
println(checkIfDataIsInMemory(df))
/**
* 1
* Cache statement is already issued on this dataframe
* true
*/
这篇关于如何检查由于Pyspark中的延迟执行而导致数据是否已缓存在数据帧中或尚未缓存?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!