在Apache Spark中,如何使RDD/DataFrame操作变得懒惰? [英] In Apache Spark, how to make an RDD/DataFrame operation lazy?

查看:180
本文介绍了在Apache Spark中,如何使RDD/DataFrame操作变得懒惰?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我想编写一个可转换DataFrame的函数foo:

Assuming that I would like to write a function foo that transforms a DataFrame:

object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}

由于foo的实现具有许多动作"(收集,减少等),因此调用foo将立即触发昂贵的执行.

since the implementation of foo has many "Actions" (collect, reduce etc.), calling foo will immediately triggers the expensive execution.

这不是什么大问题,但是由于foo仅将一个DataFrame转换为另一个,因此按照惯例,最好允许延迟执行:仅当结果DataFrame或其派生对象为foo时,才应执行foo的实现在驱动程序上使用(通过另一个操作").

This is not a big problem, however since foo only converts a DataFrame to another, by convention it should be better to allow lazy execution: the implementation of foo should be executed only if the resulted DataFrame or its derivative(s) are being used on the Driver (through another "Action").

到目前为止,可靠地实现此目标的唯一方法是将所有实现写入SparkPlan中,然后将其叠加到DataFrame的SparkExecution中,这非常容易出错,并且涉及许多样板代码.推荐的方法是什么?

So far, the only way to reliably achieve this is through writing all implementations into a SparkPlan, and superimpose it into the DataFrame's SparkExecution, this is very error-prone and involves lots of boilerplate codes. What is the recommended way to do this?

推荐答案

对我来说,要实现的目标还不是很清楚,但是Scala本身至少提供了一些可能有用的工具:

It is not exactly clear to me what you try to achieve but Scala itself provides at least few tools which you may find useful:

  • 惰性值:

  • lazy vals:

val rdd = sc.range(0, 10000)

lazy val count = rdd.count  // Nothing is executed here
// count: Long = <lazy>

count  // count is evaluated only when it is actually used 
// Long = 10000   

  • 呼叫名称(在函数定义中由=>表示):

  • call-by-name (denoted by => in the function definition):

    def  foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
      if (takeFirst) first else second
    
    val rdd1 = sc.range(0, 10000)
    val rdd2 = sc.range(0, 10000)
    
    foo(
      { println("first"); rdd1.count },
      { println("second"); rdd2.count },
      true  // Only first will be evaluated
    )
    // first
    // Long = 10000
    

    注意:在实践中,您应该创建本地延迟绑定,以确保不会在每次访问时都对参数进行评估.

    Note: In practice you should create local lazy binding to make sure that arguments are not evaluated on every access.

    无限懒惰集合,例如

    import org.apache.spark.mllib.random.RandomRDDs._
    
    val initial = normalRDD(sc, 1000000L, 10)
    
    // Infinite stream of RDDs and actions and nothing blows :)
    val stream: Stream[RDD[Double]] = Stream(initial).append(
      stream.map {
        case rdd if !rdd.isEmpty => 
          val mu = rdd.mean
          rdd.filter(_ > mu)
        case _ => sc.emptyRDD[Double]
      }
    )
    

  • 其中一些子集应该足以实现复杂的惰性计算.

    Some subset of these should be more than enough to implement complex lazy computations.

    这篇关于在Apache Spark中,如何使RDD/DataFrame操作变得懒惰?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆