如何检查数据是否已缓存在数据帧中或由于 Pyspark 中的延迟执行而尚未缓存? [英] How to check if data is cached in dataframe or not yet cached due to lazy execution in Pyspark?
问题描述
我的问题与我在堆栈溢出时发现的其他问题几乎没有什么不同.我需要知道数据是否已经被检索并存储在数据框中,或者这是否还没有发生
My question is little different from other question I could find on stack overflow. I need to know if the data is already retrieved and stored in a dataframe or if that is yet to happen
我正在做这样的事情
df1=spark.table("sourceDB.Table1")
df1.cache()
现在,您可能知道,由于延迟执行,尚未从源表中读取数据.所以我需要在这里有一个表达式,表示结果为假";此时.
Now, as you might be aware, data is not read yet from the source table due to lazy execution. So I need to have an expression here that says the result as "False" at this point.
一段时间后,我正在做一些需要从源检索数据的操作.例如.
After sometime, I am doing some operation that requires data to be retrieved from source. For example.
df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()
此时,df1的数据肯定已经被读取并存储在缓存中.所以我需要在这里有一个表达式,表示结果为真";此时.
At this point, data must have been read and stored in cache for df1. So I need to have an expression here that says the result as "True" at this point.
无论如何我们可以做到这一点吗?我相信 df1.is_cached 在这种情况下无济于事
Is there anyway we can achieve this? I believe df1.is_cached will not help in this situation
推荐答案
也许这个有用
1.如果您想检查 cache/persist
是否已经在数据帧上触发,那么您可以使用 cachemanager
来确认,如下所示-强>
1. If you wanted to check whether the cache/persist
is already triggered on the dataframe then you can use cachemanager
to confirm that as below-
spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty
2.如果您想检查数据是否在内存中,也许下面的方法会有所帮助-
def checkIfDataIsInMemory(df: DataFrame): Boolean = {
val manager = df.sparkSession.sharedState.cacheManager
// step 1 - check if the dataframe.cache is issued earlier or not
if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
println("Cache statement is already issued on this dataframe")
// step-2 check if the data is in memory or not
val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
} else false
}
3.测试上面的方法-
val df = spark.read
.parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
".parquet").getPath)
println(checkIfDataIsInMemory(df))
/**
* false
*/
df.cache()
// check if the data is cached
println(checkIfDataIsInMemory(df))
/**
* Cache statement is already issued on this dataframe
* false
*/
println(df.count())
println(checkIfDataIsInMemory(df))
/**
* 1
* Cache statement is already issued on this dataframe
* true
*/
这篇关于如何检查数据是否已缓存在数据帧中或由于 Pyspark 中的延迟执行而尚未缓存?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!