Spark动态DAG与硬编码DAG相比要慢得多并且有所不同 [英] Spark dynamic DAG is a lot slower and different from hard coded DAG
问题描述
我在spark中进行了一项操作,该操作应针对数据帧中的几列执行.通常,有两种可能性可以指定此类操作
I have an operation in spark which should be performed for several columns in a data frame. Generally, there are 2 possibilities to specify such operations
- 硬编码
handleBias("bar", df)
.join(handleBias("baz", df), df.columns)
.drop(columnsToDrop: _*).show
- 从名称列表动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
if (isFirst) {
res = handleBias(col, res)
isFirst = false
} else {
res = handleBias(col, res)
}
}
res.drop(columnsToDrop: _*).show
问题在于,与使用硬编码的操作相比,使用更多的列时,动态生成的DAG会有所不同,并且动态解决方案的运行时间会增加更多.
The problem is that the DAG generated dynamically is different and the runtime of the dynamic solution increases far more when more columns are used than for the hard coded operations.
我很好奇如何以快速的执行时间将动态构造的优雅结合在一起.
I am curious how to combine the elegance of the dynamic construction with quick execution times.
这是示例代码的DAG的比较
Here is the comparison for the DAGs of the example code
对于大约80列,这会为硬编码变体生成一个相当不错的图形 对于动态构造的查询而言,它的DAG很大,可能不太并行,并且速度较慢.
For around 80 columns this results in a rather nice graph for the hard-coded variant And a very big, probably less parallelizable and way slower DAG for the dynamically constructed query.
当前版本的spark(2.0.2)与DataFrames
和spark-sql
A current version of spark (2.0.2) was used with DataFrames
and spark-sql
完成最小示例的代码:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
编辑
使用foldleft
运行任务会生成线性DAG
并对所有列的功能进行硬编码会导致
edit
Running your task with foldleft
generates a linear DAG
and hard coding the function for all the columns results in
两者都比我原来的DAG好很多,但是硬编码的变体对我来说看起来更好.将字符串连接到spark中的SQL语句的字符串可以允许我动态生成硬编码的执行图,但这似乎很丑陋.您还有其他选择吗?
Both are a lot better than my original DAGs but still, the hardcoded variant looks better to me. String concatenating a SQL statement in spark could allow me to dynamically generate the hard coded execution graph but that seems rather ugly. Do you see any other option?
推荐答案
从handleBias中删除了一个窗口函数,并将其转换为广播联接.
Edit 1: Removed one window function from handleBias and transformed it into a broadcast join.
更改了空值的替换策略.
Edit 2: Changed replacing strategy for null values.
我有一些建议可以改善您的代码.首先,对于"handleBias"函数,我将使用窗口函数和"withColumn"调用来完成,避免连接:
I have some suggestions that can improve your code. First, for the "handleBias" function, I would do it using window functions and "withColumn" calls, avoiding the joins:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
val result = df
.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
result
}
然后,对于多列调用它,我建议使用foldLeft
,它是解决此类问题的功能性"方法:
Then, for calling it for multiple columns, I would recommend using foldLeft
which is the "functional" approach for this kind of problem:
val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")
val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop:_*).show()
+---+--------------------+------------------+--------+------------------+--------+
|foo| bar| pre_baz|pre2_baz| pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
| 2| noValidFormat| 0.0| 2.0| 0.0| 2.0|
| 1|lastAssumingSameDate|0.3333333333333333| 1.0|0.3333333333333333| 1.0|
| 1| second|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
| 1| first|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
+---+--------------------+------------------+--------+------------------+--------+
我不确定它会不会大大改善您的DAG,但至少它会使代码更整洁,更易读.
I'm not sure it will improve a lot your DAG, but at least it makes the code cleaner and more readable.
参考:
- 有关窗口函数的Databricks文章: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
- 可用功能的API文档: https://coderwall.com/p/4l73-a/scala-fold-foldleft-foldright
- Databricks article on Window Functions: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
- API docs for the available functions: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
- foldLeft: https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright
这篇关于Spark动态DAG与硬编码DAG相比要慢得多并且有所不同的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!