在 Apache Spark 中,如何使 RDD/DataFrame 操作变得懒惰? [英] In Apache Spark, how to make an RDD/DataFrame operation lazy?
问题描述
假设我想编写一个函数 foo 来转换 DataFrame:
Assuming that I would like to write a function foo that transforms a DataFrame:
object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}
由于foo的实现有很多Action"(collect、reduce等),调用foo会立即触发代价高昂的执行.
since the implementation of foo has many "Actions" (collect, reduce etc.), calling foo will immediately triggers the expensive execution.
这不是一个大问题,但是由于 foo 只将一个 DataFrame 转换为另一个,按照惯例,最好允许延迟执行:只有在结果 DataFrame 或其派生的结果是正在驱动程序上使用(通过另一个操作").
This is not a big problem, however since foo only converts a DataFrame to another, by convention it should be better to allow lazy execution: the implementation of foo should be executed only if the resulted DataFrame or its derivative(s) are being used on the Driver (through another "Action").
到目前为止,可靠地实现这一点的唯一方法是将所有实现写入 SparkPlan,并将其叠加到 DataFrame 的 SparkExecution 中,这非常容易出错并且涉及大量样板代码.推荐的方法是什么?
So far, the only way to reliably achieve this is through writing all implementations into a SparkPlan, and superimpose it into the DataFrame's SparkExecution, this is very error-prone and involves lots of boilerplate codes. What is the recommended way to do this?
推荐答案
我不太清楚您尝试实现的目标,但 Scala 本身至少提供了一些您可能会觉得有用的工具:
It is not exactly clear to me what you try to achieve but Scala itself provides at least few tools which you may find useful:
懒惰的vals:
lazy vals:
val rdd = sc.range(0, 10000)
lazy val count = rdd.count // Nothing is executed here
// count: Long = <lazy>
count // count is evaluated only when it is actually used
// Long = 10000
call-by-name(在函数定义中用=>
表示):
def foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
if (takeFirst) first else second
val rdd1 = sc.range(0, 10000)
val rdd2 = sc.range(0, 10000)
foo(
{ println("first"); rdd1.count },
{ println("second"); rdd2.count },
true // Only first will be evaluated
)
// first
// Long = 10000
注意:在实践中,您应该创建本地延迟绑定,以确保不会在每次访问时评估参数.
Note: In practice you should create local lazy binding to make sure that arguments are not evaluated on every access.
无限惰性集合,如流
infinite lazy collections like Stream
import org.apache.spark.mllib.random.RandomRDDs._
val initial = normalRDD(sc, 1000000L, 10)
// Infinite stream of RDDs and actions and nothing blows :)
val stream: Stream[RDD[Double]] = Stream(initial).append(
stream.map {
case rdd if !rdd.isEmpty =>
val mu = rdd.mean
rdd.filter(_ > mu)
case _ => sc.emptyRDD[Double]
}
)
其中的一些子集应该足以实现复杂的惰性计算.
Some subset of these should be more than enough to implement complex lazy computations.
这篇关于在 Apache Spark 中,如何使 RDD/DataFrame 操作变得懒惰?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!