什么时候执行缓存并持久执行(因为它们看起来不像是动作)? [英] When are cache and persist executed (since they don't seem like actions)?

查看:43
本文介绍了什么时候执行缓存并持久执行(因为它们看起来不像是动作)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实现一个spark应用程序,下面是一个示例代码段(代码不完全相同):

I am implementing a spark application, of which below is a sample snippet(Not the exact same code):

val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)

从Spark Application Master UI检查此代码的性能时,我看到count操作的条目,但没有persist的条目.该count操作的DAG还具有用于"map"转换的节点(上述代码的第2行).

On checking the performance of this code from the Spark Application Master UI, I see an entry for the count action, but not for the persist. The DAG for this count action also has a node for the 'map' transformation (line 2 of the above code).

是否可以安全地得出结论,当遇到count(在最后一行)时执行映射转换,而不是遇到persist时执行映射转换?

Is it safe to conclude that the map transformation is executed when count (in the last line) is encountered, and not when persist is encountered?

此外,rdd2实际保留到什么时候? 我了解在RDD上只能调用两种类型的操作-转换和操作.如果在调用count动作时RDD延迟保存,那么该持久化将被视为转换或动作,还是都不被视为?

Also, at what point is rdd2 actually persisted? I understand that only two types of operations can be called on RDDs - transformations and actions. If the RDD is persisted lazily when the count action is called, would persist be considered a transformation or an action or neither?

推荐答案

数据集的cachepersist运算符是惰性的,在您调用操作之前没有任何作用(并且等到缓存完成为止)以后会有更好的性能的额外价格.)

Dataset's cache and persist operators are lazy and don't have any effect until you call an action (and wait till the caching has finished which is the extra price for having a better performance later on).

摘自Spark的官方文档 RDD持久性(加上黑体字):

From Spark's official documentation RDD Persistence (with the sentence in bold mine):

Spark中最重要的功能之一是跨操作将数据集保留(或缓存)在内存中.当您保留RDD时,每个节点都会将其计算的所有分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用它们.这样可以使以后的操作更快(通常快10倍以上).缓存是用于迭代算法和快速交互使用的关键工具.

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

您可以使用其上的persist()cache()方法将RDD标记为要保留. 第一次在操作中对其进行计算,它将保存在节点上的内存中. Spark的缓存是容错的-如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

这正是某些人(以及Spark SQL本身!)执行以下技巧的原因:

That's exactly the reason why some people (and Spark SQL itself!) do the following trick:

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count

触发缓存.

count运算符相当便宜,因此最终结果是几乎在该行之后立即执行缓存(由于异步执行,在缓存完成之前可能会有一个小的延迟).

count operator is fairly cheap so the net effect is that the caching is executed almost immediately after the line (there might be a small delay before the caching has completed as it executes asynchronously).

countpersist之后的优点如下:

  1. 没有任何动作(但count本身)会忍受"额外的缓存时间

  1. No action (but the count itself) will "suffer" the extra time for caching

此行和使用缓存的rdd2的位置之间的时间可能足以完全完成缓存,因此可以更好地使用该时间(无需额外的减速"缓存)

The time between this line and the place where the cached rdd2 is used could be enough to fully complete the caching and hence the time would be used better (without extra "slowdown" for caching)

所以当你问:

persist会被视为是转变或行动,还是都不被视为?

would persist be considered a transformation or an action or neither?

我不会说这两者,并且将其视为优化提示(可能会或可能不会执行或将 ever 考虑在内)

I'd say it's neither and consider it an optimization hint (that may or may not be executed or taken into account ever).

使用Web UI的存储"标签来查看已保留哪些数据集(作为其底层RDD).

Use web UI's Storage tab to see what Datasets (as their underlying RDDs) have already been persisted.

您还可以使用explain(或简称为QueryExecution.optimizedPlan)查看cachepersist运算符的输出.

You can also see cache or persist operators' output using explain (or simply QueryExecution.optimizedPlan).

val q1 = spark.range(10).groupBy('id % 5).count.cache
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [(id % 5)#84L, count#83L]
   +- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)])
            +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
               +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)])
                  +- *Range (0, 10, step=1, splits=8)

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)

InMemoryTableScan物理运算符(具有InMemoryRelation逻辑计划)是确保查询被缓存在内存中并因此被重用的方法.

InMemoryTableScan physical operator (with InMemoryRelation logical plan) is how you can make sure that the query is cached in-memory and hence reused.

此外,Spark SQL本身使用相同的模式来触发SQL的

Moreover, Spark SQL itself uses the same pattern to trigger DataFrame caching for SQL's CACHE TABLE query (which, unlike RDD caching, is by default eager):

if (!isLazy) {
  // Performs eager caching
  sparkSession.table(tableIdent).count()
}

这意味着就缓存而言,取决于运算符,您可能会得到不同的结果.默认情况下,cachepersist运算符是惰性的,而SQL的CACHE TABLE则很急.

That means that depending on the operators you may have different result as far as caching is concerned. cache and persist operators are lazy by default while SQL's CACHE TABLE is eager.

这篇关于什么时候执行缓存并持久执行(因为它们看起来不像是动作)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆