当 DF 有太多列时,Spark UDF 每条记录调用多次 [英] Spark UDF called more than once per record when DF has too many columns

查看:36
本文介绍了当 DF 有太多列时,Spark UDF 每条记录调用多次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark 1.6.1 并遇到一个奇怪的行为:我在包含一些输入数据的数据帧上运行带有一些繁重计算(物理模拟)的 UDF,并构建一个包含许多列的结果数据帧(~40).

I'm using Spark 1.6.1 and encountering a strange behaviour: I'm running an UDF with some heavy computations (a physics simulations) on a dataframe containing some input data, and building up a result-Dataframe containing many columns (~40).

奇怪的是,在这种情况下,我的输入数据帧的每个记录都会多次调用我的 UDF(1.6 倍),我认为这是不可接受的,因为它非常昂贵.如果我减少列数(例如减少到 20),那么这种行为就会消失.

Strangely, my UDF is called more than once per Record of my input Dataframe in this case (1.6 times more often), which I find unacceptable because its very expensive. If I reduce the number of columns (e.g. to 20), then this behavior disappears.

我设法写了一个小脚本来演示这一点:

I managed to write down a small script which demonstrates this:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf


object Demo {

  case class Result(a: Double)

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val numRuns = sc.accumulator(0) // to count the number of udf calls

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")

    // get results of UDF
    var results = data
      .withColumn("tmp", myUdf($"id"))
      .withColumn("result", $"tmp.a")


    // add many columns to dataframe (must depend on the UDF's result)
    for (i <- 1 to 42) {
      results=results.withColumn(s"col_$i",$"result")
    }

    // trigger action
    val res = results.collect()
    println(res.size) // prints 100

    println(numRuns.value) // prints 160

  }
}

现在,有没有办法在不减少列数的情况下解决这个问题?

Now, is there a way to solve this without reducing the number of columns?

推荐答案

我无法真正解释这种行为 - 但显然查询计划以某种方式选择了某些记录计算两次的路径.这意味着如果我们缓存中间结果(在应用 UDF 之后),我们可能能够强制"Spark 不重新计算 UDF.事实上,一旦添加缓存,它就会按预期运行 - UDF 被精确调用 100 次:

I can't really explain this behavior - but obviously the query plan somehow chooses a path where some of the records are calculated twice. This means that if we cache the intermediate result (right after applying the UDF) we might be able to "force" Spark not to recompute the UDF. And indeed, once caching is added it behaves as expected - UDF is called exactly 100 times:

// get results of UDF
var results = data
  .withColumn("tmp", myUdf($"id"))
  .withColumn("result", $"tmp.a").cache()

当然,缓存有其自身的成本(内存...),但如果它节省了许多 UDF 调用,它最终可能对您有益.

Of course, caching has its own costs (memory...), but it might end up beneficial in your case if it saves many UDF calls.

这篇关于当 DF 有太多列时,Spark UDF 每条记录调用多次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆