何时缓存DataFrame? [英] When to cache a DataFrame?

查看:86
本文介绍了何时缓存DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,什么时候应该使用dataframe.cache()以及什么时候有用?

My question is, when should I do dataframe.cache() and when it's usefull?

此外,我应该在代码中将数据帧缓存在注释行中吗?

Also, in my code should I cache the dataframes in the commented lines?

注意:我的数据帧是从Redshift DB加载的.

Note: My dataframes are loaded from a Redshift DB.

非常感谢

这是我的代码:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
    df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
    df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])

    dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
        .filter(dataframe.seq_reserva.isin(seq_reservas))

    ##################################################
    #SHOULD I CACHE HERE df_vta, df_cpa and dataframe
    ##################################################

    dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
                                        dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
                                        dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
                                        dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
                                        dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
                                        ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
                                                "cod_esquema_vta", "cod_emp_atlas_vta") \
        .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
                       dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
                       dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
                       dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
                       dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
                       ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
                               "cod_esquema_cpa", "cod_emp_atlas_cpa") \
        .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
                "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")

    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################

    dataframe = dataframe.withColumn("amount1",
                                     func.when(dataframe.ind_tipo_regimen_fac == 'E',
                                               dataframe.imp_margen_canal * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise(dataframe.imp_venta * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_venta - dataframe.imp_margen_canal) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.withColumn("amount2",
                                     func.when(dataframe.ind_tipo_regimen_con == 'E',
                                               dataframe.imp_margen_canco * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_coste) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.na.fill({'amount1': 0})
    dataframe = dataframe.na.fill({'amount2': 0})

    dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
                                        dataframe.seq_reserva == df_aux.booking_id])

    dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount1))

    dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount2))

    dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)

    dataframe = dataframe.na.fill({'impuesto_canco': 0})

    dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################
    dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
        withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")

    return dataframe

推荐答案

什么时候应该做dataframe.cache()以及什么时候有用?

when should I do dataframe.cache() and when it's usefull?

cache您将在查询中使用的内容(以及早期(通常是可用内存)).您使用哪种编程语言(Python或Scala或Java或SQL或R)并没有什么关系,因为底层的机制是相同的.

cache what you are going to use across queries (and early and often up to available memory). It does not really matter what programming language you use (Python or Scala or Java or SQL or R) as the underlying mechanics is the same.

您可以使用explain运算符(其中InMemoryRelation实体以其存储级别反映缓存的数据集)来查看是否在您的物理计划中缓存了DataFrame:

You can see if a DataFrame was cached in your physical plan using explain operator (where InMemoryRelation entities reflect cached datasets with their storage level):

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

cache(或persist)DataFrame之后,第一个查询可能会变慢,但是它将为后面的查询带来回报.

After you cache (or persist) your DataFrame the first query may get slower, but it is going to pay off for the following queries.

您可以使用以下代码检查是否已缓存数据集:

You can check whether a Dataset was cached or not using the following code:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

此外,我应该在代码中将数据帧缓存在注释行中吗?

Also, in my code should I cache the dataframes in the commented lines?

是,不是.缓存代表外部数据集的内容,这样您每次查询时都不必为在网络上访问外部存储而在网络上传输数据支付额外的费用.

Yes and no. Cache what represents external datasets so you don't pay the extra price of transmitting data across network (while accessing the external storage) every time you query over them.

不要缓存仅使用一次或易于计算的内容.否则,为cache.

Don't cache what you use only once or is easy to compute. Otherwise, cache.

请注意要缓存的内容,即要缓存的Dataset内容,因为它会缓存不同的查询.

Be careful what you cache, i.e. what Dataset is cached, as it gives different queries cached.

// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)


Spark SQL中的缓存有一个惊喜.缓存是懒惰的,这就是为什么您要付出额外的代价才能使行缓存第一个操作,但这仅在DataFrame API中发生.在SQL中,缓存是急切的,因为您没有调用任何操作来触发缓存,因此查询性能会发生巨大的变化.


There's one surprise with caching in Spark SQL. Caching is lazy and that's why you pay the extra price to have rows cached the very first action, but that only happens with DataFrame API. In SQL, caching is eager which makes a huge difference in query performance as you don't have you call an action to trigger caching.

这篇关于何时缓存DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆