火花窗口功能VS按性能分组 [英] spark windowing function VS group by performance issue

查看:53
本文介绍了火花窗口功能VS按性能分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像这样的数据框

I have a dataframe done like

| id | date      |  KPI_1 | ... | KPI_n
| 1  |2012-12-12 |   0.1  | ... |  0.5
| 2  |2012-12-12 |   0.2  | ... |  0.4
| 3  |2012-12-12 |   0.66 | ... |  0.66 
| 1  |2012-12-13 |   0.2  | ... |  0.46
| 4  |2012-12-14 |   0.2  | ... |  0.45 
| ...
| 55| 2013-03-15 |  0.5  | ... |  0.55

我们有

  • X个标识符
  • 给定日期的每个标识符的行
  • n个KPI

我必须为每行计算一些派生的KPI,并且此KPI取决于每个ID的先前值. 假设我得出的KPI是一个差异,那就是:

I have to calculate some derived KPI for every row, and this KPI depends on the previous values of every ID. Let's say my derived KPI is a diff, it would be:

| id | date      |  KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
| 1  |2012-12-12 |   0.1  | ... |  0.5  |   0.1      | 0.5
| 2  |2012-12-12 |   0.2  | ... |  0.4  |   0.2      |0.4
| 3  |2012-12-12 |   0.66 | ... |  0.66 |   0.66     | 0.66 
| 1  |2012-12-13 |   0.2  | ... |  0.46 |   0.2-0.1  | 0.46 - 0.66
| 4  |2012-12-13 |   0.2  | ... |  0.45  ...
| ...
| 55| 2013-03-15 |  0.5  | ... |  0.55

现在:我要做的是:

val groupedDF = myDF.groupBy("id").agg(
    collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
    collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
    // up until nth KPI
)

我将获得汇总数据,例如:

I would get aggregated data such as:

[("2012-12-12",0.1),("2012-12-12",0.2) ...

然后,我将对这些包装的数据进行排序,解包并使用一些UDF在这些汇总结果上进行映射,并产生输出(计算差异和其他统计信息).

Then I would sort these wrapped data, unwrap and map over these aggregated result with some UDF and produce the output (compute diffs and other statistics).

另一种方法是使用窗口功能,例如:

Another approach is to use the window functions such as:

val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L) 

然后做:

val windowedDF = df.select (
  col("id"),
  col("date"),
  col("KPI_1"),
  collect_list(struct(col("date"),col("KPI_1"))).over(window),  
  collect_list(struct(col("date"),col("KPI_2"))).over(window)
   )   

这样我可以得到:

[("2012-12-12",0.1)]
[("2012-12-12",0.1), ("2012-12-13",0.1)]
...

看起来更好处理,但是我怀疑重复窗口会为每个KPI产生不必要的分组和排序.

That look nicer to process, but I suspect that repeating the window would produce unnecessary grouping and sorting for every KPI.

所以这是问题:

  1. 我想采用分组方式吗?
  2. 我要去窗户吗?如果是这样,最有效的方法是什么?

推荐答案

我相信窗口方法应该是更好的解决方案,但是在使用窗口功能之前,您应该根据ID重新划分数据帧.这将仅对数据进行一次随机播放,并且所有窗口函数都应使用已随机播放的数据帧执行.希望对您有帮助.

I believe the window approach should be a better solution but before using the window functions you should re-partition the dataframe based on id. This will shuffle the data only once and all the window functions should be executed with already shuffled dataframe. I hope it helps.

代码应该是这样的.

val windowedDF = df.repartition(col("id"))
  .select (
  col("id"),
  col("date"),
  col("KPI_1"),
  col("KPI_2"),
  collect_list(struct(col("date"),col("KPI_1"))).over(window),
  collect_list(struct(col("date"),col("KPI_2"))).over(window)
)


@Raphael Roth


@Raphael Roth

在这里,我们正在汇总一个窗口.这就是为什么您可能会看到相同的执行计划的原因.请参见下面的示例,其中只能从一个分区完成多个窗口的聚合.

Here, we are aggregating over a single window. That is why you might be seeing same execution plan. Please see the example below where aggregation over multiple window can be done from one partition only.

val list = Seq(( "2", null, 1, 11, 1, 1 ),
  ( "2", null, 1, 22, 2, 2 ),
  ( "2", null, 1, 11, 1, 3 ),
  ( "2", null, 1, 22, 2, 1 ),
  ( "2", null, 1, 33, 1, 2 ),
  ( null, "3", 3, 33, 1, 2 ),
  ( null, "3", 3, 33, 2, 3 ),
  ( null, "3", 3, 11, 1, 1 ),
  ( null, "3", 3, 22, 2, 2 ),
  ( null, "3", 3, 11, 1, 3 )
)

val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")

val c1Window = Window.partitionBy("batchDate", "c1")
val c2Window = Window.partitionBy("batchDate", "c2")

val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
  .withColumn("c2List", collect_list("pv").over(c2Window))

val agg2df = df.repartition($"batchDate")
  .withColumn("c1List",collect_list("pv").over(c1Window))
  .withColumn("c2List", collect_list("pv").over(c2Window))


agg1df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(batchDate#16, c2#15, 1)
      +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
         +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(batchDate#16, c1#14, 1)
               +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
                     +- Scan ExternalRDDScan[obj#6]

agg2df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
   +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
      +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(batchDate#16, 1)
            +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
               +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
                  +- Scan ExternalRDDScan[obj#6]

这篇关于火花窗口功能VS按性能分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆