Spark 动态 DAG 比硬编码的 DAG 慢很多 [英] Spark dynamic DAG is a lot slower and different from hard coded DAG

查看:26
本文介绍了Spark 动态 DAG 比硬编码的 DAG 慢很多的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 spark 中有一个操作,应该对数据框中的几列执行.一般来说,有两种可能来指定这样的操作

I have an operation in spark which should be performed for several columns in a data frame. Generally, there are 2 possibilities to specify such operations

  • 硬编码
handleBias("bar", df)
  .join(handleBias("baz", df), df.columns)
  .drop(columnsToDrop: _*).show

  • 从列名列表中动态生成它们
  • var isFirst = true
    var res = df
    for (col <- columnsToDrop ++ columnsToCode) {
      if (isFirst) {
        res = handleBias(col, res)
        isFirst = false
      } else {
        res = handleBias(col, res)
      }
    }
    res.drop(columnsToDrop: _*).show
    

    问题在于动态生成的 DAG 是不同的,并且当使用更多列时,动态解决方案的运行时间比硬编码操作增加得更多.

    The problem is that the DAG generated dynamically is different and the runtime of the dynamic solution increases far more when more columns are used than for the hard coded operations.

    我很好奇如何将动态结构的优雅与快速的执行时间结合起来.

    这是示例代码的 DAG 的比较

    Here is the comparison for the DAGs of the example code

    对于大约 80 列,这会为硬编码变体生成一个相当不错的图表对于动态构造的查询,一个非常大的、可能不太并行且速度较慢的 DAG.

    For around 80 columns this results in a rather nice graph for the hard-coded variant And a very big, probably less parallelizable and way slower DAG for the dynamically constructed query.

    当前版本的 spark (2.0.2) 与 DataFrames 和 spark-sql

    A current version of spark (2.0.2) was used with DataFrames and spark-sql

    完成最小示例的代码:

    def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
      val pre1_1 = df
        .filter(df(target) === 1)
        .groupBy(col, target)
        .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
        .drop(target)
    
      val pre2_1 = df
        .groupBy(col)
        .agg(mean(target).alias("pre2_" + col))
    
      df
        .join(pre1_1, Seq(col), "left")
        .join(pre2_1, Seq(col), "left")
        .na.fill(0)
    }
    

    编辑

    使用 foldleft 运行您的任务会生成线性 DAG并对所有列的函数进行硬编码导致

    edit

    Running your task with foldleft generates a linear DAG and hard coding the function for all the columns results in

    两者都比我原来的 DAG 好很多,但是,硬编码的变体对我来说看起来更好.在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码的执行图,但这看起来相当难看.您还有其他选择吗?

    Both are a lot better than my original DAGs but still, the hardcoded variant looks better to me. String concatenating a SQL statement in spark could allow me to dynamically generate the hard coded execution graph but that seems rather ugly. Do you see any other option?

    推荐答案

    编辑 1: 从 handleBias 中删除了一个窗口函数并将其转换为广播连接.

    Edit 1: Removed one window function from handleBias and transformed it into a broadcast join.

    编辑 2: 更改了空值的替换策略.

    Edit 2: Changed replacing strategy for null values.

    我有一些建议可以改进您的代码.首先,对于handleBias"函数,我会使用窗口函数和withColumn"调用来实现,避免连接:

    I have some suggestions that can improve your code. First, for the "handleBias" function, I would do it using window functions and "withColumn" calls, avoiding the joins:

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
      val w1 = Window.partitionBy(colName)
      val w2 = Window.partitionBy(colName, target)
      val result = df
        .withColumn("cnt_group", count("*").over(w2))
        .withColumn("pre2_" + colName, mean(target).over(w1))
        .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
        .drop("cnt_group")
      result
    }
    

    然后,为了为多列调用它,我建议使用 foldLeft 这是此类问题的功能"方法:

    Then, for calling it for multiple columns, I would recommend using foldLeft which is the "functional" approach for this kind of problem:

    val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")
    
    val columnsToDrop = Seq("baz")
    val columnsToCode = Seq("bar", "baz")
    val target = "foo"
    
    val targetCounts = df.filter(df(target) === 1).groupBy(target)
      .agg(count(target).as("cnt_foo_eq_1"))
    val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
    
    val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
      (currentDF, colName) => handleBias(currentDF, colName)
    }
    
    result.drop(columnsToDrop:_*).show()
    
    +---+--------------------+------------------+--------+------------------+--------+
    |foo|                 bar|           pre_baz|pre2_baz|           pre_bar|pre2_bar|
    +---+--------------------+------------------+--------+------------------+--------+
    |  2|       noValidFormat|               0.0|     2.0|               0.0|     2.0|
    |  1|lastAssumingSameDate|0.3333333333333333|     1.0|0.3333333333333333|     1.0|
    |  1|              second|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
    |  1|               first|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
    +---+--------------------+------------------+--------+------------------+--------+
    

    我不确定它是否会大大改善您的 DAG,但至少它使代码更清晰、更具可读性.

    I'm not sure it will improve a lot your DAG, but at least it makes the code cleaner and more readable.

    参考:

    这篇关于Spark 动态 DAG 比硬编码的 DAG 慢很多的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆