当DF的列过多时,Spark UDF的每条记录调用不止一次 [英] Spark UDF called more than once per record when DF has too many columns

查看:276
本文介绍了当DF的列过多时,Spark UDF的每条记录调用不止一次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 1.6.1,遇到一个奇怪的行为:我正在对包含一些输入数据的数据帧运行带有大量计算(物理模拟)的UDF,并构建一个包含许多列的result-Dataframe (〜40).

I'm using Spark 1.6.1 and encountering a strange behaviour: I'm running an UDF with some heavy computations (a physics simulations) on a dataframe containing some input data, and building up a result-Dataframe containing many columns (~40).

奇怪的是,在这种情况下,我的UDF在每个输入数据框的记录中被调用了多次(是1.6倍),这是不可接受的,因为它非常昂贵.如果我减少列数(例如减少到20),则此行为消失.

Strangely, my UDF is called more than once per Record of my input Dataframe in this case (1.6 times more often), which I find unacceptable because its very expensive. If I reduce the number of columns (e.g. to 20), then this behavior disappears.

我设法写下了一个小脚本来演示这一点:

I managed to write down a small script which demonstrates this:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf


object Demo {

  case class Result(a: Double)

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val numRuns = sc.accumulator(0) // to count the number of udf calls

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")

    // get results of UDF
    var results = data
      .withColumn("tmp", myUdf($"id"))
      .withColumn("result", $"tmp.a")


    // add many columns to dataframe (must depend on the UDF's result)
    for (i <- 1 to 42) {
      results=results.withColumn(s"col_$i",$"result")
    }

    // trigger action
    val res = results.collect()
    println(res.size) // prints 100

    println(numRuns.value) // prints 160

  }
}

现在,有没有一种方法可以解决此问题而又不减少列数?

Now, is there a way to solve this without reducing the number of columns?

推荐答案

我无法真正解释这种行为-但很明显,查询计划以某种方式选择了将某些记录计算两次的路径.这意味着,如果我们缓存中间结果(应用UDF之后),我们可能能够强制" Spark不要重新计算UDF.实际上,一旦添加了缓存,它的行为就会达到预期的效果-UDF正好被调用了100次:

I can't really explain this behavior - but obviously the query plan somehow chooses a path where some of the records are calculated twice. This means that if we cache the intermediate result (right after applying the UDF) we might be able to "force" Spark not to recompute the UDF. And indeed, once caching is added it behaves as expected - UDF is called exactly 100 times:

// get results of UDF
var results = data
  .withColumn("tmp", myUdf($"id"))
  .withColumn("result", $"tmp.a").cache()

当然,缓存有其自身的成本(内存...),但是如果它节省了许多UDF调用,最终可能对您有利.

Of course, caching has its own costs (memory...), but it might end up beneficial in your case if it saves many UDF calls.

这篇关于当DF的列过多时,Spark UDF的每条记录调用不止一次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆