Spark将SQL窗口功能迁移到RDD以获得更好的性能 [英] Spark migrate sql window function to RDD for better performance

查看:63
本文介绍了Spark将SQL窗口功能迁移到RDD以获得更好的性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

应该对一个数据帧中的多列执行一个函数

A function should be executed for multiple columns in a data frame

def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

这可以很好地编写,如上面的spark-SQL和for循环所示.但是,这会引起很多改组( spark将函数并行应用于列).

This can be written nicely as shown above in spark-SQL and a for loop. However this is causing a lot of shuffles (spark apply function to columns in parallel).

一个最小的例子:

  val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

  val columnsToDrop = Seq("col3TooMany")
  val columnsToCode = Seq("col1", "col2")
  val target = "TARGET"

  val targetCounts = df.filter(df(target) === 1).groupBy(target)
    .agg(count(target).as("cnt_foo_eq_1"))
  val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

  val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
    (currentDF, colName) => handleBias(currentDF, colName)
  }

  result.drop(columnsToDrop: _*).show

如何使用RDD API来更有效地表述? aggregateByKey应该是一个好主意,但是对我来说仍然不是很清楚如何在这里应用它来代替窗口函数.

How can I formulate this more efficient using RDD API? aggregateByKeyshould be a good idea but is still not very clear to me how to apply it here to substitute the window functions.

(提供了更多上下文/更大的示例 https://github.com/geoHeil/sparkContrastCoding)

(provides a bit more context / bigger example https://github.com/geoHeil/sparkContrastCoding)

最初,我从

Initially, I started with Spark dynamic DAG is a lot slower and different from hard coded DAG which is shown below. The good thing is, each column seems to run independent /parallel. The downside is that the joins (even for a small dataset of 300 MB) get "too big" and lead to an unresponsive spark.

handleBiasOriginal("col1", df)
    .join(handleBiasOriginal("col2", df), df.columns)
    .join(handleBiasOriginal("col3TooMany", df), df.columns)
    .drop(columnsToDrop: _*).show

  def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
    val pre1_1 = df
      .filter(df(target) === 1)
      .groupBy(col, target)
      .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
      .drop(target)

    val pre2_1 = df
      .groupBy(col)
      .agg(mean(target).alias("pre2_" + col))

    df
      .join(pre1_1, Seq(col), "left")
      .join(pre2_1, Seq(col), "left")
      .na.fill(0)
  }

此图片带有spark 2.1.0,来自

This image is with spark 2.1.0, the images from Spark dynamic DAG is a lot slower and different from hard coded DAG are with 2.0.2

应用缓存时,DAG会更简单 df.cache handleBiasOriginal("col1",df). ...

The DAG will be a bit simpler when caching is applied df.cache handleBiasOriginal("col1", df). ...

除了优化窗口功能外,您还可以看到其他哪些可能性来优化SQL? 最好是动态生成SQL.

What other possibilities than window functions do you see to optimize the SQL? At best it would be great if the SQL was generated dynamically.

推荐答案

此处的重点是避免不必要的改组.现在,您的代码会针对您要包含的每一列进行两次随机播放,并且结果数据的布局无法在各列之间重复使用.

The main point here is to avoid unnecessary shuffles. Right now your code shuffles twice for each column you want to include and the resulting data layout cannot be reused between columns.

为简单起见,我假设target始终为二进制({0,1}),并且您使用的所有其余列均为StringType.此外,我假设列的基数足够低,无法对结果进行分组和本地处理.您可以调整这些方法来处理其他情况,但这需要更多工作.

For simplicity I assume that target is always binary ({0, 1}) and all remaining columns you use are of StringType. Furthermore I assume that the cardinality of the columns is low enough for the results to be grouped and handled locally. You can adjust these methods to handle other cases but it requires more work.

RDD API

  • 将数据从宽到长整形:

  • Reshape data from wide to long:

import org.apache.spark.sql.functions._

val exploded = explode(array(
  (columnsToDrop ++ columnsToCode).map(c => 
    struct(lit(c).alias("k"), col(c).alias("v"))): _*
)).alias("level")

val long = df.select(exploded, $"TARGET")

  • aggregateByKey,重塑并收集:

  • aggregateByKey, reshape and collect:

    import org.apache.spark.util.StatCounter
    
    val lookup = long.as[((String, String), Int)].rdd
      // You can use prefix partitioner (one that depends only on _._1)
      // to avoid reshuffling for groupByKey
      .aggregateByKey(StatCounter())(_ merge _, _ merge _)
      .map { case ((c, v), s) => (c, (v, s)) }
      .groupByKey
      .mapValues(_.toMap)
      .collectAsMap
    

  • 您可以使用lookup来获取各个列和级别的统计信息.例如:

  • You can use lookup to get statistics for individual columns and levels. For example:

    lookup("col1")("A")
    

    org.apache.spark.util.StatCounter = 
      (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)
    

    为您提供col1级别A的数据.基于二进制TARGET的假设,此信息是完整的(两个类的计数/分数均得到).

    Gives you data for col1, level A. Based on the binary TARGET assumption this information is complete (you get count / fractions for both classes).

    您可以使用像这样的查找来生成SQL表达式或将其传递给udf并将其应用于各个列.

    You can use lookup like this to generate SQL expressions or pass it to udf and apply it on individual columns.

    DataFrame API

    • 将数据转换为与RDD API一样长的数据.
    • 根据级别计算合计:

    • Convert data to long as for RDD API.
    • Compute aggregates based on levels:

    val stats = long
      .groupBy($"level.k", $"level.v")
      .agg(mean($"TARGET"), sum($"TARGET"))
    

  • 根据您的喜好,您可以对其进行重塑以实现有效的联接或转换为本地集合,类似于RDD解决方案.

  • Depending on your preferences you can reshape this to enable efficient joins or convert to a local collection and similarly to the RDD solution.

    这篇关于Spark将SQL窗口功能迁移到RDD以获得更好的性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆