自制 DataFrame 聚合/dropDuplicates Spark [英] Homemade DataFrame aggregation/dropDuplicates Spark
问题描述
我想对我的 DataFrame df
执行转换,以便我在最终 DataFrame 中只有每个键一次并且只有一次.
I want to perform a transformation on my DataFrame df
so that I only have each key once and only once in the final DataFrame.
出于机器学习的目的,我不想在我的数据集中有偏差.这应该永远不会发生,但是我从数据源获得的数据包含这种怪异".因此,如果我有具有相同键的行,我希望能够选择两者的组合(如平均值)或字符串连接(例如标签)或随机值集.
For machine learning purposes, I don't want to have a bias in my dataset. This should never occur, but the data I get from my data source contains this "weirdness". So if I have lines with the same keys, I want to be able to chose either a combination of the two (like mean value) or a string concatenation (labels for example) or a random values set.
说我的 DataFrame df
看起来像这样:
Say my DataFrame df
looks like this:
+---+----+-----------+---------+
|ID1| ID2| VAL1| VAL2|
+---+----+-----------+---------+
| A| U| PIERRE| 1|
| A| U| THOMAS| 2|
| A| U| MICHAEL| 3|
| A| V| TOM| 2|
| A| V| JACK| 3|
| A| W| MICHEL| 2|
| A| W| JULIEN| 3|
+---+----+-----------+---------+
我希望我的最终 DataFrame out
只为每个键随机保留一组值.它可能是另一种类型的聚合(比如将所有值串联为一个字符串),但我只是不想从中构建一个 Integer 值,而是构建新条目.
I want my final DataFrame out
to only keep one set of values per key, randomly. It could be another type of aggregation (say the concatenation of all values as a string) but I just don't want to build an Integer value from it, rather build new entries.
例如.最终输出可能是(每个键只保留第一行):
Eg. a final output could be (keeping only the first row per key):
+---+----+-----------+---------+
|ID1| ID2| VAL1| VAL2|
+---+----+-----------+---------+
| A| U| PIERRE| 1|
| A| V| TOM| 2|
| A| W| MICHEL| 2|
+---+----+-----------+---------+
另一个最终输出可能是(每个键保留一个随机行):
Another final output could be (keeping a random row per key):
+---+----+-----------+---------+
|ID1| ID2| VAL1| VAL2|
+---+----+-----------+---------+
| A| U| MICHAEL| 3|
| A| V| JACK| 3|
| A| W| MICHEL| 2|
+---+----+-----------+---------+
或者,构建一组新的值:
Or, building a new set of values:
+---+----+--------------------------+----------+
|ID1| ID2| VAL1| VAL2|
+---+----+--------------------------+----------+
| A| U| (PIERRE, THOMAS, MICHAEL)| (1, 2, 3)|
| A| V| (TOM, JACK)| (2, 3)|
| A| W| (MICHEL, JULIEN)| (2, 3)|
+---+----+--------------------------+----------+
答案应该使用 Spark 和 Scala.我还想强调,实际的模式比这复杂得多,我想找到一个通用的解决方案.此外,我不想只想从一列中获取唯一值,而是过滤掉具有相同键的行.谢谢!
The answer should use Spark with Scala. I also want to underline that the actual schema is way more complicated than that and I would like to reach a generic solution. Also, I do not want to fetch only unique values from one column but filter out lines that have same keys. Thanks!
EDIT 这就是我试图做的(但 Row.get(colname)
抛出一个 NoSuchElementException: key not found...
):
EDIT This is what I tried to do (but Row.get(colname)
throws a NoSuchElementException: key not found...
):
def myDropDuplicatesRandom(df: DataFrame, colnames: Seq[String]): DataFrame = {
val fields_map: Map[String, (Int, DataType)] =
df.schema.fieldNames.map(fname => {
val findex = df.schema.fieldIndex(fname)
val ftype = df.schema.fields(findex).dataType
(fname, (findex, ftype))
}).toMap[String, (Int, DataType)]
df.sparkSession.createDataFrame(
df.rdd
.map[(String, Row)](r => (colnames.map(colname => r.get(fields_map(colname)._1).toString.replace("`", "")).reduceLeft((x, y) => "" + x + y), r))
.groupByKey()
.map{case (x: String, y: Iterable[Row]) => Utils.randomElement(y)}
, df.schema)
}
推荐答案
这是一种方法:
val df = Seq(
("A", "U", "PIERRE", 1),
("A", "U", "THOMAS", 2),
("A", "U", "MICHAEL", 3),
("A", "V", "TOM", 2),
("A", "V", "JACK", 3),
("A", "W", "MICHEL", 2),
("A", "W", "JULIEN", 3)
).toDF("ID1", "ID2", "VAL1", "VAL2")
import org.apache.spark.sql.functions._
// Gather key/value column lists based on specific filtering criteria
val keyCols = df.columns.filter(_.startsWith("ID"))
val valCols = df.columns diff keyCols
// Group by keys to aggregate combined value-columns then re-expand
df.groupBy(keyCols.map(col): _*).
agg(first(struct(valCols.map(col): _*)).as("VALS")).
select($"ID1", $"ID2", $"VALS.*")
// +---+---+------+----+
// |ID1|ID2| VAL1|VAL2|
// +---+---+------+----+
// | A| W|MICHEL| 2|
// | A| V| TOM| 2|
// | A| U|PIERRE| 1|
// +---+---+------+----+
[更新]
如果我正确理解了您的扩展要求,您正在寻找一种通用方法来通过具有任意 agg
函数的键来转换数据帧,例如:
If I understand your expanded requirement correctly, you're looking for a generic way to transform dataframes by keys with an arbitrary agg
function, like:
import org.apache.spark.sql.Column
def customAgg(keyCols: Seq[String], valCols: Seq[String], aggFcn: Column => Column) = {
df.groupBy(keyCols.map(col): _*).
agg(aggFcn(struct(valCols.map(col): _*)).as("VALS")).
select($"ID1", $"ID2", $"VALS.*")
}
customAgg(keyCols, valCols, first)
我想说的是,沿着这条路走下去会导致适用的 agg
功能非常有限.虽然上述适用于 first
,但您必须以不同的方式实现,例如 collect_list/collect_set
等.当然可以手动滚动所有各种类型的 agg
函数,但它可能会导致不必要的代码维护麻烦.
I'd say that going down this path would result in very limited applicable agg
functions. While the above works for first
, you would have to implement differently for, say, collect_list/collect_set
, etc. One can certainly hand-roll all the various types of agg
functions, but it would likely result in unwarranted code maintenance hassle.
这篇关于自制 DataFrame 聚合/dropDuplicates Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!