如何检查由于Pyspark中的延迟执行而导致数据是否已缓存在数据帧中或尚未缓存? [英] How to check if data is cached in dataframe or not yet cached due to lazy execution in Pyspark?

查看:185
本文介绍了如何检查由于Pyspark中的延迟执行而导致数据是否已缓存在数据帧中或尚未缓存?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题与我在堆栈溢出时可以找到的其他问题几乎没有什么不同.我需要知道数据是否已经被检索并存储在数据框中,或者是否尚未发生

My question is little different from other question I could find on stack overflow. I need to know if the data is already retrieved and stored in a dataframe or if that is yet to happen

我正在做这样的事情

df1=spark.table("sourceDB.Table1")
df1.cache()

现在,您可能已经知道,由于延迟执行,尚未从源表中读取数据.因此,我需要在此处使用一个表达式将结果表示为"False".此时.

Now, as you might be aware, data is not read yet from the source table due to lazy execution. So I need to have an expression here that says the result as "False" at this point.

一段时间后,我正在执行一些操作,要求从源中检索数据.例如.

After sometime, I am doing some operation that requires data to be retrieved from source. For example.

df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()

此时,必须已读取数据并将其存储在df1的高速缓存中.因此,我需要在此处使用一个表达式将结果表示为"True".此时.

At this point, data must have been read and stored in cache for df1. So I need to have an expression here that says the result as "True" at this point.

无论如何,我们可以实现这一目标吗?我相信df1.is_cached在这种情况下无济于事

Is there anyway we can achieve this? I believe df1.is_cached will not help in this situation

推荐答案

也许这很有用

1.如果要检查数据框中是否已经触发了 cache/persist ,则可以使用cachemanager确认是否如下所示-

1. If you wanted to check whether the cache/persist is already triggered on the dataframe then you can use cachemanagerto confirm that as below-

spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty

2.如果您想检查内存中是否有数据,也许下面的方法会有所帮助-

 def checkIfDataIsInMemory(df: DataFrame): Boolean = {
      val manager = df.sparkSession.sharedState.cacheManager
      // step 1 - check if the dataframe.cache is issued earlier or not
      if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
        println("Cache statement is already issued on this dataframe")
        // step-2 check if the data is in memory or not
        val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
        cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
      } else false
    }

3.测试以上方法-

 val df = spark.read
      .parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
        ".parquet").getPath)
    println(checkIfDataIsInMemory(df))
    /**
      * false
      */
    
    df.cache()
    // check if the data is cached
    println(checkIfDataIsInMemory(df))
    /**
      * Cache statement is already issued on this dataframe
      * false
      */
    
    println(df.count())
    println(checkIfDataIsInMemory(df))

    /**
      * 1
      * Cache statement is already issued on this dataframe
      * true
      */

这篇关于如何检查由于Pyspark中的延迟执行而导致数据是否已缓存在数据帧中或尚未缓存?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆