Spark获取列中数组中具有相同值的所有行 [英] Spark get all rows with same values in array in column
问题描述
我有一个包含 id
和 hashes
列的 Spark 数据帧,其中 hashes
列包含一个 Seq
长度为 n
的整数值.示例:
+----+--------------------++ 身份证|哈希|+----+--------------------+|0 |[1, 2, 3, 4, 5]||1 |[1, 5, 3, 7, 9]||2 |[9, 3, 6, 8, 0]|+-------------------------+
我想获取一个数据框,其中包含 hashes
中的数组至少在一个位置匹配的所有行.更正式地说,我想要一个带有附加列 matches
的数据框,对于每一行 r
包含 id
的 Seq
s 行,其中 hashes[r][i] == hashes[k][i]
其中 k
是至少一个 i 值的任何其他行
.
对于我的示例数据,结果将是:
+---+---------------+-------+|id |哈希|匹配|+---+---------------+-------+|0 |[1, 2, 3, 4, 5]|[1] ||1 |[1, 5, 3, 7, 9]|[0] ||2 |[9, 3, 6, 8, 0]|[] |+---+---------------+-------+
在 Spark 3 中,以下代码比较行之间的数组,仅保留两个数组在同一位置共享至少一个元素的行.df
是您的输入数据框:
df.join(df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),存在(arrays_zip(col(哈希"),col(哈希2")),x => x(哈希")=== x(哈希2"))).groupBy("id").agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched")).withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))
详细说明
首先,我们执行自动交叉连接,根据您在两个哈希数组上相同位置的至少一个元素的条件进行过滤.
为了构建条件,我们压缩了两个哈希数组,一个来自第一个数据帧,一个来自第二个加入的数据帧,也就是第一个重命名列的数据帧.通过压缩,我们得到一个 {"hashes":x, "hashes2":y}
数组,接下来我们只需要检查这个数组中是否存在一个元素,其中 x =y
.完整的条件写成如下:
exists(arrays_zip(col(hashes"), col(hashes2")), x => x(hashes") === x(哈希2"))
然后,我们将按 id
列聚合以收集所有 id2
保留的行,即符合您条件的行>
保留哈希";列,对于具有相同id"的两行列哈希"是相等的,我们得到第一次出现的散列";对于每个id".我们收集所有的id2"使用 collect_list:
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as(";匹配"))
最后,我们从匹配"列中过滤掉当前行的id
.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))
如果您需要id"为了有序,你可以添加一个 orderBy
子句:
.orderBy("id")
运行
使用包含以下值的数据帧 df
:
+---+---------------+|id |哈希值 |+---+---------------+|0 |[1, 2, 3, 4, 5]||1 |[1, 5, 3, 7, 9]||2 |[9, 3, 6, 8, 0]|+---+---------------+
您得到以下输出:
+---+---------------+-------+|id |哈希|匹配|+---+---------------+-------+|0 |[1, 2, 3, 4, 5]|[1] ||1 |[1, 5, 3, 7, 9]|[0] ||2 |[9, 3, 6, 8, 0]|[] |+---+---------------+-------+
限制
join 是笛卡尔积,非常昂贵.虽然条件过滤结果,但它会导致在大数据集上进行大量的计算/shuffle,并且性能可能很差.
如果你使用的是3.0之前版本的Spark,则需要替换一些内置火花函数 用户自定义函数
I have a Spark Dataframe with columns id
and hashes
, where the column hashes
contains a Seq
of integer values of length n
. Example:
+----+--------------------+
+ id| hashes|
+----+--------------------+
|0 | [1, 2, 3, 4, 5]|
|1 | [1, 5, 3, 7, 9]|
|2 | [9, 3, 6, 8, 0]|
+-------------------------+
I want to get a dataframe with all the rows for which the arrays in hashes
match in at least one position. More formally, I want a dataframe with an additional column matches
that for each row r
contains a Seq
of id
s of rows where hashes[r][i] == hashes[k][i]
with k
being any other row for at leas one value of i
.
For my example data, the result would be:
+---+---------------+-------+
|id |hashes |matches|
+---+---------------+-------+
|0 |[1, 2, 3, 4, 5]|[1] |
|1 |[1, 5, 3, 7, 9]|[0] |
|2 |[9, 3, 6, 8, 0]|[] |
+---+---------------+-------+
In Spark 3, the following code compares arrays between rows, keeping only rows where the two arrays share at least one element at the same position. df
is your input dataframe:
df.join(
df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
)
.groupBy("id")
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
.withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))
Detailed description
First, we perform an auto cross join, filtered by your condition of at least one element in same position on the two hashes arrays.
To build the condition, we zip the two hashes arrays, one from first dataframe, one for the second joined dataframe, that is just the first dataframe with columns renamed. By zipping, we get an array of {"hashes":x, "hashes2":y}
and next we just need to check that in this array exists an element where x = y
. The complete condition is written as follow:
exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
Then, we will aggregate by column id
to collect all id2
of rows that were kept, meaning rows that matching your condition
to keep the "hashes" column, as for two rows with the same "id" the column "hashes" are equals, we get the first occurrence of "hashes" for each "id". And we collect all the "id2" using collect_list:
.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))
And finally, we filter out from column "matches" the id of the current row
.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))
if you need the "id" to be in order, you can add an orderBy
clause:
.orderBy("id")
Run
With a dataframe df
containing the following values:
+---+---------------+
|id |hashes |
+---+---------------+
|0 |[1, 2, 3, 4, 5]|
|1 |[1, 5, 3, 7, 9]|
|2 |[9, 3, 6, 8, 0]|
+---+---------------+
You get the following output:
+---+---------------+-------+
|id |hashes |matches|
+---+---------------+-------+
|0 |[1, 2, 3, 4, 5]|[1] |
|1 |[1, 5, 3, 7, 9]|[0] |
|2 |[9, 3, 6, 8, 0]|[] |
+---+---------------+-------+
Limits
The join is a cartesian product, which is very expensive. Although the condition filters results, it can lead to an huge amount of calculation/shuffle on big datasets, and may have very poor performance.
If you use Spark whose version is before 3.0, you have to replace some build-in spark functions by user-defined functions
这篇关于Spark获取列中数组中具有相同值的所有行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!