自制DataFrame聚合/dropDuplicates Spark [英] Homemade DataFrame aggregation/dropDuplicates Spark

查看:64
本文介绍了自制DataFrame聚合/dropDuplicates Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对我的DataFrame df 进行转换,这样我每个键只有一次,而在最终DataFrame中只有一次.

I want to perform a transformation on my DataFrame df so that I only have each key once and only once in the final DataFrame.

出于机器学习的目的,我不想在数据集中出现偏见.这永远都不会发生,但是我从数据源获得的数据包含这种怪异".因此,如果我的行具有相同的键,则希望能够选择两者的组合(例如平均值)或字符串连接(例如标签)或设置随机值.

For machine learning purposes, I don't want to have a bias in my dataset. This should never occur, but the data I get from my data source contains this "weirdness". So if I have lines with the same keys, I want to be able to chose either a combination of the two (like mean value) or a string concatenation (labels for example) or a random values set.

说我的DataFrame df 看起来像这样:

Say my DataFrame df looks like this:

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   U|     THOMAS|        2|
|  A|   U|    MICHAEL|        3|
|  A|   V|        TOM|        2|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
|  A|   W|     JULIEN|        3|
+---+----+-----------+---------+

我希望我的最终DataFrame out 每个键只随机保留一组值.这可能是另一种聚合(例如,将所有值串联为字符串),但我只是不想从中构建一个Integer值,而是构建新的条目.

I want my final DataFrame out to only keep one set of values per key, randomly. It could be another type of aggregation (say the concatenation of all values as a string) but I just don't want to build an Integer value from it, rather build new entries.

例如.最终输出可能是(每个键仅保留第一行):

Eg. a final output could be (keeping only the first row per key):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|     PIERRE|        1|
|  A|   V|        TOM|        2|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

另一个最终输出可能是(每个键保持一个随机行):

Another final output could be (keeping a random row per key):

+---+----+-----------+---------+
|ID1| ID2|       VAL1|     VAL2|
+---+----+-----------+---------+
|  A|   U|    MICHAEL|        3|
|  A|   V|       JACK|        3|
|  A|   W|     MICHEL|        2|
+---+----+-----------+---------+

或者,建立一组新值:

+---+----+--------------------------+----------+
|ID1| ID2|                      VAL1|      VAL2|
+---+----+--------------------------+----------+
|  A|   U| (PIERRE, THOMAS, MICHAEL)| (1, 2, 3)|
|  A|   V|               (TOM, JACK)|    (2, 3)|
|  A|   W|          (MICHEL, JULIEN)|    (2, 3)|
+---+----+--------------------------+----------+

答案应该与Scala一起使用Spark.我还想强调一下,实际的架构要比这复杂得多,我想找到一个通用的解决方案.另外,我不想不想只从一列中获取唯一值,而是过滤掉具有相同键的行.谢谢!

The answer should use Spark with Scala. I also want to underline that the actual schema is way more complicated than that and I would like to reach a generic solution. Also, I do not want to fetch only unique values from one column but filter out lines that have same keys. Thanks!

编辑这是我尝试做的(但是 Row.get(colname)抛出 NoSuchElementException:找不到键... ):

EDIT This is what I tried to do (but Row.get(colname) throws a NoSuchElementException: key not found...):

  def myDropDuplicatesRandom(df: DataFrame, colnames: Seq[String]): DataFrame = {
    val fields_map: Map[String, (Int, DataType)] =
      df.schema.fieldNames.map(fname => {
        val findex = df.schema.fieldIndex(fname)
        val ftype = df.schema.fields(findex).dataType
        (fname, (findex, ftype))
      }).toMap[String, (Int, DataType)]

    df.sparkSession.createDataFrame(
      df.rdd
        .map[(String, Row)](r => (colnames.map(colname => r.get(fields_map(colname)._1).toString.replace("`", "")).reduceLeft((x, y) => "" + x + y), r))
        .groupByKey()
        .map{case (x: String, y: Iterable[Row]) => Utils.randomElement(y)}
    , df.schema)
  }

推荐答案

这里是一种方法:

val df = Seq(
  ("A", "U", "PIERRE", 1),
  ("A", "U", "THOMAS", 2),
  ("A", "U", "MICHAEL", 3),
  ("A", "V", "TOM", 2),
  ("A", "V", "JACK", 3),
  ("A", "W", "MICHEL", 2),
  ("A", "W", "JULIEN", 3)
).toDF("ID1", "ID2", "VAL1", "VAL2")

import org.apache.spark.sql.functions._

// Gather key/value column lists based on specific filtering criteria
val keyCols = df.columns.filter(_.startsWith("ID"))
val valCols = df.columns diff keyCols

// Group by keys to aggregate combined value-columns then re-expand
df.groupBy(keyCols.map(col): _*).
  agg(first(struct(valCols.map(col): _*)).as("VALS")).
  select($"ID1", $"ID2", $"VALS.*")

// +---+---+------+----+
// |ID1|ID2|  VAL1|VAL2|
// +---+---+------+----+
// |  A|  W|MICHEL|   2|
// |  A|  V|   TOM|   2|
// |  A|  U|PIERRE|   1|
// +---+---+------+----+

[更新]

如果我正确地理解了您的扩展需求,那么您正在寻找一种通用的方法来通过具有任意 agg 函数的键来转换数据帧,例如:

If I understand your expanded requirement correctly, you're looking for a generic way to transform dataframes by keys with an arbitrary agg function, like:

import org.apache.spark.sql.Column

def customAgg(keyCols: Seq[String], valCols: Seq[String], aggFcn: Column => Column) = {
  df.groupBy(keyCols.map(col): _*).
    agg(aggFcn(struct(valCols.map(col): _*)).as("VALS")).
    select($"ID1", $"ID2", $"VALS.*")
}

customAgg(keyCols, valCols, first)

我想说,走这条道路将导致有限的适用 agg 函数.尽管上述方法适用于 first ,但您必须为 collect_list/collect_set 等实现不同的实现.当然,您可以手动滚动所有各种类型的 agg 函数,但这可能会导致不必要的代码维护麻烦.

I'd say that going down this path would result in very limited applicable agg functions. While the above works for first, you would have to implement differently for, say, collect_list/collect_set, etc. One can certainly hand-roll all the various types of agg functions, but it would likely result in unwarranted code maintenance hassle.

这篇关于自制DataFrame聚合/dropDuplicates Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆