在并行列上应用火花功能 [英] spark apply function to columns in parallel

查看:130
本文介绍了在并行列上应用火花功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark将并行处理数据,但不会并行处理数据.在我的DAG中,我想按列调用一个函数,例如 并行处理火花处理列,可以独立于其他列来计算每个列的值.有什么方法可以通过spark-SQL API实现这种并行性吗?使用窗口函数 Spark动态DAG它比硬编码DAG慢得多,并且与硬编码DAG有所不同,它有助于优化DAG,但只能以串行方式执行.

Spark will process the data in parallel, but not the operations. In my DAG I want to call a function per column like Spark processing columns in parallel the values for each column could be calculated independently from other columns. Is there any way to achieve such parallelism via spark-SQL API? Utilizing window functions Spark dynamic DAG is a lot slower and different from hard coded DAG helped to optimize the DAG by a lot but only executes in a serial fashion.

可以找到包含更多信息的示例 https://github.com/geoHeil/sparkContrastCoding

An example which contains a little bit more information can be found https://github.com/geoHeil/sparkContrastCoding

下面的最小示例:

val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

val inputToDrop = Seq("col3TooMany")
val inputToBias = Seq("col1", "col2")

val targetCounts = df.filter(df("TARGET") === 1).groupBy("TARGET").agg(count("TARGET").as("cnt_foo_eq_1"))
val newDF = df.toDF.join(broadcast(targetCounts), Seq("TARGET"), "left")
  newDF.cache
def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

val joinUDF = udf((newColumn: String, newValue: String, codingVariant: Int, results: Map[String, Map[String, Seq[Double]]]) => {
    results.get(newColumn) match {
      case Some(tt) => {
        val nestedArray = tt.getOrElse(newValue, Seq(0.0))
        if (codingVariant == 0) {
          nestedArray.head
        } else {
          nestedArray.last
        }
      }
      case None => throw new Exception("Column not contained in initial data frame")
    }
  })

现在,我想将我的handleBias函数应用于所有列,不幸的是,这不是并行执行的.

Now I want to apply my handleBias function to all the columns, unfortunately, this is not executed in parallel.

val res = (inputToDrop ++ inputToBias).toSet.foldLeft(newDF) {
    (currentDF, colName) =>
      {
        logger.info("using col " + colName)
        handleBias(currentDF, colName)
      }
  }
    .drop("cnt_foo_eq_1")

val combined = ((inputToDrop ++ inputToBias).toSet).foldLeft(res) {
    (currentDF, colName) =>
      {
        currentDF
          .withColumn("combined_" + colName, map(col(colName), array(col("pre_" + colName), col("pre2_" + colName))))
      }
  }

val columnsToUse = combined
    .select(combined.columns
      .filter(_.startsWith("combined_"))
      map (combined(_)): _*)

val newNames = columnsToUse.columns.map(_.split("combined_").last)
val renamed = columnsToUse.toDF(newNames: _*)

val cols = renamed.columns
val localData = renamed.collect

val columnsMap = cols.map { colName =>
    colName -> localData.flatMap(_.getAs[Map[String, Seq[Double]]](colName)).toMap
}.toMap

推荐答案

每列的

值可以独立于其他列进行计算

values for each column could be calculated independently from other columns

虽然确实如此,但这并不能真正帮助您解决问题.您可以生成许多独立的DataFrames,每个DataFrames都有自己的添加项,但这并不意味着您可以自动将其组合为一个执行计划.

While it is true it doesn't really help your case. You can generate a number of independent DataFrames, each one with its own additions, but it doesn't mean you can automatically combine this into a single execution plan.

每个handleBias应用程序都会将您的数据随机洗两次,并且输出DataFrames与父级DataFrame的数据分布不同.这就是为什么当fold在列列表上时,每个加法必须分别执行的原因.

Each application of handleBias shuffles your data twice and output DataFrames don't have the same data distribution as the parent DataFrame. This is why when you fold over the list of columns each addition has to be performed separately.

理论上,您可以设计一个可以这样表达的管道(使用 pseudocode ):

Theoretically you could design a pipeline which can be expressed (with pseudocode) like this:

  • 添加唯一ID:

  • add unique id:

df_with_id = df.withColumn("id", unique_id())

  • 独立计算每个df并转换为 wide 格式:

    dfs = for (c in columns) 
      yield handle_bias(df, c).withColumn(
        "pres", explode([(pre_name, pre_value), (pre2_name, pre2_value)])
      )
    

  • 联合所有部分结果:

  • union all partial results:

    combined = dfs.reduce(union)
    

  • 枢轴可从长格式转换为宽格式:

  • pivot to convert from long to wide format:

    combined.groupBy("id").pivot("pres._1").agg(first("pres._2"))
    

  • 但我怀疑是否值得大惊小怪.您使用的过程非常繁琐,并且需要大量的网络和磁盘IO.

    but I doubt it is worth all the fuss. The process you use is extremely heavy as it is and requires a significant network and disk IO.

    如果总级别(sum count(distinct x)) for x in columns))的数量相对较少,则可以尝试使用例如aggregateByKeyMap[Tuple2[_, _], StatCounter]的单次通过来计算所有统计信息,否则请考虑下采样到可以在本地计算统计信息的级别.

    If number of total levels (sum count(distinct x)) for x in columns)) is relatively low you can try to compute all statistics with a single pass using for example aggregateByKey with Map[Tuple2[_, _], StatCounter] otherwise consider downsampling to the level where you can compute statistics locally.

    这篇关于在并行列上应用火花功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆