自制 DataFrame 聚合/dropDuplicates Spark [英] Homemade DataFrame aggregation/dropDuplicates Spark

查看:29
本文介绍了自制 DataFrame 聚合/dropDuplicates Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对我的 DataFrame df 执行转换,以便我在最终 DataFrame 中只有每个键一次并且只有一次.

I want to perform a transformation on my DataFrame df so that I only have each key once and only once in the final DataFrame.

出于机器学习的目的,我不想在我的数据集中有偏差.这应该永远不会发生,但是我从数据源获得的数据包含这种怪异".因此,如果我有具有相同键的行,我希望能够选择两者的组合(如平均值)或字符串连接(例如标签)或随机值集.

For machine learning purposes, I don't want to have a bias in my dataset. This should never occur, but the data I get from my data source contains this "weirdness". So if I have lines with the same keys, I want to be able to chose either a combination of the two (like mean value) or a string concatenation (labels for example) or a random values set.

说我的 DataFrame df 看起来像这样:

Say my DataFrame df looks like this:

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   U|     THOMAS|        2|
|  A|   U|    MICHAEL|        3|
|  A|   V|        TOM|        2|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
|  A|   W|     JULIEN|        3|
+---+----+-----------+---------+

我希望我的最终 DataFrame out 只为每个键随机保留一组值.它可能是另一种类型的聚合(比如将所有值串联为一个字符串),但我只是不想从中构建一个 Integer 值,而是构建新条目.

I want my final DataFrame out to only keep one set of values per key, randomly. It could be another type of aggregation (say the concatenation of all values as a string) but I just don't want to build an Integer value from it, rather build new entries.

例如.最终输出可能是(每个键只保留第一行):

Eg. a final output could be (keeping only the first row per key):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   V|        TOM|        2|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

另一个最终输出可能是(每个键保留一个随机行):

Another final output could be (keeping a random row per key):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|    MICHAEL|        3|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

或者,构建一组新的值:

Or, building a new set of values:

+---+----+--------------------------+----------+
|ID1| ID2|                      VAL1|      VAL2|
+---+----+--------------------------+----------+
|  A|   U| (PIERRE, THOMAS, MICHAEL)| (1, 2, 3)|
|  A|   V|               (TOM, JACK)|    (2, 3)|
|  A|   W|          (MICHEL, JULIEN)|    (2, 3)|
+---+----+--------------------------+----------+

答案应该使用 Spark 和 Scala.我还想强调,实际的模式比这复杂得多,我想找到一个通用的解决方案.此外,我不想只想从一列中获取唯一值,而是过滤掉具有相同键的行.谢谢!

The answer should use Spark with Scala. I also want to underline that the actual schema is way more complicated than that and I would like to reach a generic solution. Also, I do not want to fetch only unique values from one column but filter out lines that have same keys. Thanks!

EDIT 这就是我试图做的(但 Row.get(colname) 抛出一个 NoSuchElementException: key not found...):

EDIT This is what I tried to do (but Row.get(colname) throws a NoSuchElementException: key not found...):

  def myDropDuplicatesRandom(df: DataFrame, colnames: Seq[String]): DataFrame = {
    val fields_map: Map[String, (Int, DataType)] =
      df.schema.fieldNames.map(fname => {
        val findex = df.schema.fieldIndex(fname)
        val ftype = df.schema.fields(findex).dataType
        (fname, (findex, ftype))
      }).toMap[String, (Int, DataType)]

    df.sparkSession.createDataFrame(
      df.rdd
        .map[(String, Row)](r => (colnames.map(colname => r.get(fields_map(colname)._1).toString.replace("`", "")).reduceLeft((x, y) => "" + x + y), r))
        .groupByKey()
        .map{case (x: String, y: Iterable[Row]) => Utils.randomElement(y)}
    , df.schema)
  }

推荐答案

这是一种方法:

val df = Seq(
  ("A", "U", "PIERRE", 1),
  ("A", "U", "THOMAS", 2),
  ("A", "U", "MICHAEL", 3),
  ("A", "V", "TOM", 2),
  ("A", "V", "JACK", 3),
  ("A", "W", "MICHEL", 2),
  ("A", "W", "JULIEN", 3)
).toDF("ID1", "ID2", "VAL1", "VAL2")

import org.apache.spark.sql.functions._

// Gather key/value column lists based on specific filtering criteria
val keyCols = df.columns.filter(_.startsWith("ID"))
val valCols = df.columns diff keyCols

// Group by keys to aggregate combined value-columns then re-expand
df.groupBy(keyCols.map(col): _*).
  agg(first(struct(valCols.map(col): _*)).as("VALS")).
  select($"ID1", $"ID2", $"VALS.*")

// +---+---+------+----+
// |ID1|ID2|  VAL1|VAL2|
// +---+---+------+----+
// |  A|  W|MICHEL|   2|
// |  A|  V|   TOM|   2|
// |  A|  U|PIERRE|   1|
// +---+---+------+----+

[更新]

如果我正确理解了您的扩展要求,您正在寻找一种通用方法来通过具有任意 agg 函数的键来转换数据帧,例如:

If I understand your expanded requirement correctly, you're looking for a generic way to transform dataframes by keys with an arbitrary agg function, like:

import org.apache.spark.sql.Column

def customAgg(keyCols: Seq[String], valCols: Seq[String], aggFcn: Column => Column) = {
  df.groupBy(keyCols.map(col): _*).
    agg(aggFcn(struct(valCols.map(col): _*)).as("VALS")).
    select($"ID1", $"ID2", $"VALS.*")
}

customAgg(keyCols, valCols, first)

我想说的是,沿着这条路走下去会导致适用的 agg 功能非常有限.虽然上述适用于 first,但您必须以不同的方式实现,例如 collect_list/collect_set 等.当然可以手动滚动所有各种类型的 agg 函数,但它可能会导致不必要的代码维护麻烦.

I'd say that going down this path would result in very limited applicable agg functions. While the above works for first, you would have to implement differently for, say, collect_list/collect_set, etc. One can certainly hand-roll all the various types of agg functions, but it would likely result in unwarranted code maintenance hassle.

这篇关于自制 DataFrame 聚合/dropDuplicates Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆