何时缓存 DataFrame? [英] When to cache a DataFrame?

查看:39
本文介绍了何时缓存 DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,我什么时候应该做 dataframe.cache() 以及什么时候有用?

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

注意:我的数据帧是从 Redshift 数据库加载的.

非常感谢

这是我的代码:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):df_vta = manager.get_dataframe(表['dwc_oth_v_re_v_impuesto_sap_vta'])df_cpa = manager.get_dataframe(表['dwc_oth_v_re_v_impuesto_sap_cpa'])数据帧 = 数据帧.过滤器(数据帧.seq_rec.isin(seq_recs))\.filter(dataframe.seq_reserva.isin(seq_reserva))################################################我应该在这里缓存 df_vta、df_cpa 和数据帧###############################################dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta","cod_esquema_vta", "cod_emp_atlas_vta") \.join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa","cod_esquema_cpa", "cod_emp_atlas_cpa") \.select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con","imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")#####################################我应该再次缓存这里的数据帧吗?####################################dataframe = dataframe.withColumn("amount1",func.when(dataframe.ind_tipo_regimen_fac == 'E',dataframe.imp_margen_canal * (1 - (1/(1 + (dataframe.pct_impuesto_vta/100)))))).否则(dataframe.imp_venta *(1 - (1/(1 + (dataframe.pct_impuesto_vta/100)))) - (dataframe.imp_venta - dataframe.imp_margen_canal) * (1 - (1/(1 + (dataframe.pct_impuesto_cpa/100))))))dataframe = dataframe.withColumn("amount2",func.when(dataframe.ind_tipo_regimen_con == 'E',dataframe.imp_margen_canco * (1 - (1/(1 + (dataframe.pct_impuesto_vta/100)))))).otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (1 - (1/(1 + (dataframe.pct_impuesto_vta/100)))) - (dataframe.imp_coste) * (1 - (1/(1 + (dataframe.pct_impuesto_cpa/100))))))dataframe = dataframe.na.fill({'amount1': 0})dataframe = dataframe.na.fill({'amount2': 0})dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operating_incoming,dataframe.seq_reserva == df_aux.booking_id])dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,func.lit(欧元),dataframe.creation_date,dataframe.amount1))dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,func.lit(欧元),dataframe.creation_date,dataframe.amount2))dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)dataframe = dataframe.na.fill({'impuesto_canco': 0})dataframe = dataframe.select("operation_incoming", "booking_id", "impuesto_canco")#####################################我应该再次缓存这里的数据帧吗?####################################dataframe = dataframe.groupBy("operation_incoming", "booking_id").agg({'impuesto_canco': 'sum'}).\withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")返回数据框

解决方案

我什么时候应该做 dataframe.cache() 以及什么时候有用?

cache 您将在查询中使用的内容(以及早期并且通常是可用内存).您使用哪种编程语言(Python 或 Scala 或 Java 或 SQL 或 R)并不重要,因为底层机制是相同的.

您可以使用 explain 运算符(其中 InMemoryRelation 实体反映缓存数据集及其存储级别)查看是否在物理计划中缓存了 DataFrame:

== 物理计划 ==*项目 [id#0L, id#0L AS newId#16L]+- InMemoryTableScan [id#0L]+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)+- *范围 (0, 1, step=1, splits=Some(8))

在您cache(或persist)您的DataFrame 之后,第一个查询可能会变慢,但它会为以下查询带来回报.

您可以使用以下代码检查数据集是否已缓存:

scala>:输入q2org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]val 缓存 = spark.sharedState.cacheManager标度>cache.lookupCachedData(q2.queryExecution.logical).isDefinedres0:布尔值 = 假

<块引用>

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

是和否.缓存代表外部数据集的内容,这样您就不必在每次查询时为跨网络传输数据(访问外部存储)支付额外费用.

不要缓存您只使用一次或易于计算的内容.否则,缓存.

<小时>

注意缓存的内容,即缓存的 Dataset,因为它会缓存不同的查询.

//在 range(5) 之后缓存val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")标度>q1.解释== 物理计划 ==*过滤器 ((id#0L % 2) = 0)+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)+- *范围 (0, 5, step=1, splits=8)//最后缓存val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache标度>q2.解释== 物理计划 ==InMemoryTableScan [id#17L]+- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)+- *过滤器 ((id#17L % 2) = 0)+- *范围 (0, 1, step=1, splits=8)

<小时>

Spark SQL 中的缓存有一个惊喜.缓存是惰性的,这就是为什么您要支付额外的费用来缓存第一个操作的行,但这仅发生在 DataFrame API 中.在 SQL 中,缓存是急切的,这会对查询性能产生巨大影响,因为您无需调用操作来触发缓存.

My question is, when should I do dataframe.cache() and when it's usefull?

Also, in my code should I cache the dataframes in the commented lines?

Note: My dataframes are loaded from a Redshift DB.

Many thanks

Here my code:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
    df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
    df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])

    dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
        .filter(dataframe.seq_reserva.isin(seq_reservas))

    ##################################################
    #SHOULD I CACHE HERE df_vta, df_cpa and dataframe
    ##################################################

    dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
                                        dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
                                        dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
                                        dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
                                        dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
                                        ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
                                                "cod_esquema_vta", "cod_emp_atlas_vta") \
        .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
                       dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
                       dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
                       dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
                       dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
                       ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
                               "cod_esquema_cpa", "cod_emp_atlas_cpa") \
        .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
                "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")

    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################

    dataframe = dataframe.withColumn("amount1",
                                     func.when(dataframe.ind_tipo_regimen_fac == 'E',
                                               dataframe.imp_margen_canal * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise(dataframe.imp_venta * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_venta - dataframe.imp_margen_canal) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.withColumn("amount2",
                                     func.when(dataframe.ind_tipo_regimen_con == 'E',
                                               dataframe.imp_margen_canco * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_coste) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.na.fill({'amount1': 0})
    dataframe = dataframe.na.fill({'amount2': 0})

    dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
                                        dataframe.seq_reserva == df_aux.booking_id])

    dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount1))

    dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount2))

    dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)

    dataframe = dataframe.na.fill({'impuesto_canco': 0})

    dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################
    dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
        withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")

    return dataframe

解决方案

when should I do dataframe.cache() and when it's usefull?

cache what you are going to use across queries (and early and often up to available memory). It does not really matter what programming language you use (Python or Scala or Java or SQL or R) as the underlying mechanics is the same.

You can see if a DataFrame was cached in your physical plan using explain operator (where InMemoryRelation entities reflect cached datasets with their storage level):

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

After you cache (or persist) your DataFrame the first query may get slower, but it is going to pay off for the following queries.

You can check whether a Dataset was cached or not using the following code:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

Also, in my code should I cache the dataframes in the commented lines?

Yes and no. Cache what represents external datasets so you don't pay the extra price of transmitting data across network (while accessing the external storage) every time you query over them.

Don't cache what you use only once or is easy to compute. Otherwise, cache.


Be careful what you cache, i.e. what Dataset is cached, as it gives different queries cached.

// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)


There's one surprise with caching in Spark SQL. Caching is lazy and that's why you pay the extra price to have rows cached the very first action, but that only happens with DataFrame API. In SQL, caching is eager which makes a huge difference in query performance as you don't have you call an action to trigger caching.

这篇关于何时缓存 DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆