Spark 将 sql 窗口函数迁移到 RDD 以获得更好的性能 [英] Spark migrate sql window function to RDD for better performance

查看:38
本文介绍了Spark 将 sql 窗口函数迁移到 RDD 以获得更好的性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一个函数应该对数据框中的多列执行

def handleBias(df: DataFrame, colName: String, target: String = target) = {val w1 = Window.partitionBy(colName)val w2 = Window.partitionBy(colName, target)df.withColumn("cnt_group", count("*").over(w2)).withColumn("pre2_" + colName, mean(target).over(w1)).withColumn("pre_" + colName, 合并(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))).drop("cnt_group")}

这可以很好地编写,如上图所示的 spark-SQL 和 for 循环.但是,这会导致很多混乱(

应用缓存后,DAG 会简单一点缓存handleBiasOriginal("col1", df)....

除了窗口函数之外,您还看到哪些优化 SQL 的可能性?如果 SQL 是动态生成的,那最好不过了.

解决方案

这里的要点是避免不必要的洗牌.现在,您的代码对要包含的每一列进行两次混洗,并且无法在列之间重复使用生成的数据布局.

为简单起见,我假设 target 始终是二进制 ({0, 1}) 并且您使用的所有剩余列都是 StringType.此外,我假设列的基数足够低,以便在本地对结果进行分组和处理.您可以调整这些方法来处理其他情况,但需要更多的工作.

RDD API

  • 将数据从宽变长:

    import org.apache.spark.sql.functions._val 爆炸 = 爆炸(数组((columnsToDrop ++ columnsToCode).map(c =>struct(lit(c).alias("k"), col(c).alias("v"))): _*)).alias("级别")val long = df.select(exploded, $"TARGET")

  • aggregateByKey,重塑并收集:

    import org.apache.spark.util.StatCounterval 查找 = long.as[((String, String), Int)].rdd//您可以使用前缀分区器(仅依赖于 _._1 的分区器)//避免重新洗牌 groupByKey.aggregateByKey(StatCounter())(_合并_,_合并_).map { case ((c, v), s) =>(c, (v, s)) }.groupByKey.mapValues(_.toMap).collectAsMap

  • 您可以使用 lookup 来获取各个列和级别的统计信息.例如:

    lookup("col1")("A")

    org.apache.spark.util.StatCounter =(计数:3,平均值:0.666667,标准差:0.471405,最大值:1.000000,最小值:0.000000)

    为您提供col1 级别A 的数据.基于二进制 TARGET 假设,此信息是完整的(您将获得两个类别的计数/分数).

    您可以使用这样的查找来生成 SQL 表达式或将其传递给 udf 并将其应用于各个列.

DataFrame API

  • 将数据转换为 RDD API 的 long.
  • 基于级别计算聚合:

    val stats = long.groupBy($"level.k", $"level.v").agg(mean($"TARGET"), sum($"TARGET"))

  • 根据您的偏好,您可以重新调整它以启用高效连接或转换为本地集合,类似于 RDD 解决方案.

A function should be executed for multiple columns in a data frame

def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

This can be written nicely as shown above in spark-SQL and a for loop. However this is causing a lot of shuffles (spark apply function to columns in parallel).

A minimal example:

  val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

  val columnsToDrop = Seq("col3TooMany")
  val columnsToCode = Seq("col1", "col2")
  val target = "TARGET"

  val targetCounts = df.filter(df(target) === 1).groupBy(target)
    .agg(count(target).as("cnt_foo_eq_1"))
  val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

  val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
    (currentDF, colName) => handleBias(currentDF, colName)
  }

  result.drop(columnsToDrop: _*).show

How can I formulate this more efficient using RDD API? aggregateByKeyshould be a good idea but is still not very clear to me how to apply it here to substitute the window functions.

(provides a bit more context / bigger example https://github.com/geoHeil/sparkContrastCoding)

edit

Initially, I started with Spark dynamic DAG is a lot slower and different from hard coded DAG which is shown below. The good thing is, each column seems to run independent /parallel. The downside is that the joins (even for a small dataset of 300 MB) get "too big" and lead to an unresponsive spark.

handleBiasOriginal("col1", df)
    .join(handleBiasOriginal("col2", df), df.columns)
    .join(handleBiasOriginal("col3TooMany", df), df.columns)
    .drop(columnsToDrop: _*).show

  def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
    val pre1_1 = df
      .filter(df(target) === 1)
      .groupBy(col, target)
      .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
      .drop(target)

    val pre2_1 = df
      .groupBy(col)
      .agg(mean(target).alias("pre2_" + col))

    df
      .join(pre1_1, Seq(col), "left")
      .join(pre2_1, Seq(col), "left")
      .na.fill(0)
  }

This image is with spark 2.1.0, the images from Spark dynamic DAG is a lot slower and different from hard coded DAG are with 2.0.2

The DAG will be a bit simpler when caching is applied df.cache handleBiasOriginal("col1", df). ...

What other possibilities than window functions do you see to optimize the SQL? At best it would be great if the SQL was generated dynamically.

解决方案

The main point here is to avoid unnecessary shuffles. Right now your code shuffles twice for each column you want to include and the resulting data layout cannot be reused between columns.

For simplicity I assume that target is always binary ({0, 1}) and all remaining columns you use are of StringType. Furthermore I assume that the cardinality of the columns is low enough for the results to be grouped and handled locally. You can adjust these methods to handle other cases but it requires more work.

RDD API

  • Reshape data from wide to long:

    import org.apache.spark.sql.functions._
    
    val exploded = explode(array(
      (columnsToDrop ++ columnsToCode).map(c => 
        struct(lit(c).alias("k"), col(c).alias("v"))): _*
    )).alias("level")
    
    val long = df.select(exploded, $"TARGET")
    

  • aggregateByKey, reshape and collect:

    import org.apache.spark.util.StatCounter
    
    val lookup = long.as[((String, String), Int)].rdd
      // You can use prefix partitioner (one that depends only on _._1)
      // to avoid reshuffling for groupByKey
      .aggregateByKey(StatCounter())(_ merge _, _ merge _)
      .map { case ((c, v), s) => (c, (v, s)) }
      .groupByKey
      .mapValues(_.toMap)
      .collectAsMap
    

  • You can use lookup to get statistics for individual columns and levels. For example:

    lookup("col1")("A")
    

    org.apache.spark.util.StatCounter = 
      (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)
    

    Gives you data for col1, level A. Based on the binary TARGET assumption this information is complete (you get count / fractions for both classes).

    You can use lookup like this to generate SQL expressions or pass it to udf and apply it on individual columns.

DataFrame API

  • Convert data to long as for RDD API.
  • Compute aggregates based on levels:

    val stats = long
      .groupBy($"level.k", $"level.v")
      .agg(mean($"TARGET"), sum($"TARGET"))
    

  • Depending on your preferences you can reshape this to enable efficient joins or convert to a local collection and similarly to the RDD solution.

这篇关于Spark 将 sql 窗口函数迁移到 RDD 以获得更好的性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆