Spark动态DAG与硬编码DAG相比要慢得多并且有所不同 [英] Spark dynamic DAG is a lot slower and different from hard coded DAG

查看:177
本文介绍了Spark动态DAG与硬编码DAG相比要慢得多并且有所不同的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在spark中进行了一项操作,该操作应针对数据帧中的几列执行.通常,有两种可能性可以指定此类操作

I have an operation in spark which should be performed for several columns in a data frame. Generally, there are 2 possibilities to specify such operations

  • 硬编码
handleBias("bar", df)
  .join(handleBias("baz", df), df.columns)
  .drop(columnsToDrop: _*).show

  • 从名称列表动态生成它们
  • var isFirst = true
    var res = df
    for (col <- columnsToDrop ++ columnsToCode) {
      if (isFirst) {
        res = handleBias(col, res)
        isFirst = false
      } else {
        res = handleBias(col, res)
      }
    }
    res.drop(columnsToDrop: _*).show
    

    问题在于,与使用硬编码的操作相比,使用更多的列时,动态生成的DAG会有所不同,并且动态解决方案的运行时间会增加更多.

    The problem is that the DAG generated dynamically is different and the runtime of the dynamic solution increases far more when more columns are used than for the hard coded operations.

    我很好奇如何以快速的执行时间将动态构造的优雅结合在一起.

    I am curious how to combine the elegance of the dynamic construction with quick execution times.

    这是示例代码的DAG的比较

    Here is the comparison for the DAGs of the example code

    对于大约80列,这会为硬编码变体生成一个相当不错的图形 对于动态构造的查询而言,它的DAG很大,可能不太并行,并且速度较慢.

    For around 80 columns this results in a rather nice graph for the hard-coded variant And a very big, probably less parallelizable and way slower DAG for the dynamically constructed query.

    当前版本的spark(2.0.2)与DataFrames和spark-sql

    A current version of spark (2.0.2) was used with DataFrames and spark-sql

    完成最小示例的代码:

    def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
      val pre1_1 = df
        .filter(df(target) === 1)
        .groupBy(col, target)
        .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
        .drop(target)
    
      val pre2_1 = df
        .groupBy(col)
        .agg(mean(target).alias("pre2_" + col))
    
      df
        .join(pre1_1, Seq(col), "left")
        .join(pre2_1, Seq(col), "left")
        .na.fill(0)
    }
    

    编辑

    使用foldleft运行任务会生成线性DAG 并对所有列的功能进行硬编码会导致

    edit

    Running your task with foldleft generates a linear DAG and hard coding the function for all the columns results in

    两者都比我原来的DAG好很多,但是硬编码的变体对我来说看起来更好.将字符串连接到spark中的SQL语句的字符串可以允许我动态生成硬编码的执行图,但这似乎很丑陋.您还有其他选择吗?

    Both are a lot better than my original DAGs but still, the hardcoded variant looks better to me. String concatenating a SQL statement in spark could allow me to dynamically generate the hard coded execution graph but that seems rather ugly. Do you see any other option?

    推荐答案

    从handleBias中删除了一个窗口函数,并将其转换为广播联接.

    Edit 1: Removed one window function from handleBias and transformed it into a broadcast join.

    更改了空值的替换策略.

    Edit 2: Changed replacing strategy for null values.

    我有一些建议可以改善您的代码.首先,对于"handleBias"函数,我将使用窗口函数和"withColumn"调用来完成,避免连接:

    I have some suggestions that can improve your code. First, for the "handleBias" function, I would do it using window functions and "withColumn" calls, avoiding the joins:

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
      val w1 = Window.partitionBy(colName)
      val w2 = Window.partitionBy(colName, target)
      val result = df
        .withColumn("cnt_group", count("*").over(w2))
        .withColumn("pre2_" + colName, mean(target).over(w1))
        .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
        .drop("cnt_group")
      result
    }
    

    然后,对于多列调用它,我建议使用foldLeft,它是解决此类问题的功能性"方法:

    Then, for calling it for multiple columns, I would recommend using foldLeft which is the "functional" approach for this kind of problem:

    val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")
    
    val columnsToDrop = Seq("baz")
    val columnsToCode = Seq("bar", "baz")
    val target = "foo"
    
    val targetCounts = df.filter(df(target) === 1).groupBy(target)
      .agg(count(target).as("cnt_foo_eq_1"))
    val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
    
    val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
      (currentDF, colName) => handleBias(currentDF, colName)
    }
    
    result.drop(columnsToDrop:_*).show()
    
    +---+--------------------+------------------+--------+------------------+--------+
    |foo|                 bar|           pre_baz|pre2_baz|           pre_bar|pre2_bar|
    +---+--------------------+------------------+--------+------------------+--------+
    |  2|       noValidFormat|               0.0|     2.0|               0.0|     2.0|
    |  1|lastAssumingSameDate|0.3333333333333333|     1.0|0.3333333333333333|     1.0|
    |  1|              second|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
    |  1|               first|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
    +---+--------------------+------------------+--------+------------------+--------+
    

    我不确定它会不会大大改善您的DAG,但至少它会使代码更整洁,更易读.

    I'm not sure it will improve a lot your DAG, but at least it makes the code cleaner and more readable.

    参考:

    • Databricks article on Window Functions: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
    • API docs for the available functions: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
    • foldLeft: https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright

    这篇关于Spark动态DAG与硬编码DAG相比要慢得多并且有所不同的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆