Spark获取列中数组中具有相同值的所有行 [英] Spark get all rows with same values in array in column

查看:31
本文介绍了Spark获取列中数组中具有相同值的所有行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含 idhashes 列的 Spark 数据帧,其中 hashes 列包含一个 Seq长度为 n 的整数值.示例:

+----+--------------------++ 身份证|哈希|+----+--------------------+|0 |[1, 2, 3, 4, 5]||1 |[1, 5, 3, 7, 9]||2 |[9, 3, 6, 8, 0]|+-------------------------+

我想获取一个数据框,其中包含 hashes 中的数组至少在一个位置匹配的所有行.更正式地说,我想要一个带有附加列 matches 的数据框,对于每一行 r 包含 idSeqs 行,其中 hashes[r][i] == hashes[k][i] 其中 k 是至少一个 i 值的任何其他行.

对于我的示例数据,结果将是:

+---+---------------+-------+|id |哈希|匹配|+---+---------------+-------+|0 |[1, 2, 3, 4, 5]|[1] ||1 |[1, 5, 3, 7, 9]|[0] ||2 |[9, 3, 6, 8, 0]|[] |+---+---------------+-------+

解决方案

在 Spark 3 中,以下代码比较行之间的数组,仅保留两个数组在同一位置共享至少一个元素的行.df 是您的输入数据框:

 df.join(df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),存在(arrays_zip(col(哈希"),col(哈希2")),x => x(哈希")=== x(哈希2"))).groupBy("id").agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched")).withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))

详细说明

首先,我们执行自动交叉连接,根据您在两个哈希数组上相同位置的至少一个元素的条件进行过滤.

为了构建条件,我们压缩了两个哈希数组,一个来自第一个数据帧,一个来自第二个加入的数据帧,也就是第一个重命名列的数据帧.通过压缩,我们得到一个 {"hashes":x, "hashes2":y} 数组,接下来我们只需要检查这个数组中是否存在一个元素,其中 x =y.完整的条件写成如下:

exists(arrays_zip(col(hashes"), col(hashes2")), x => x(hashes") === x(哈希2"))

然后,我们将按 id 列聚合以收集所有 id2 保留的行,即符合您条件的行

保留哈希";列,对于具有相同id"的两行列哈希"是相等的,我们得到第一次出现的散列";对于每个id".我们收集所有的id2"使用 collect_list:

.agg(first(col("hashes")).as("hashes"), collect_list("id2").as(";匹配"))

最后,我们从匹配"列中过滤掉当前行的id

.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))

如果您需要id"为了有序,你可以添加一个 orderBy 子句:

.orderBy("id")

运行

使用包含以下值的数据帧 df:

+---+---------------+|id |哈希值 |+---+---------------+|0 |[1, 2, 3, 4, 5]||1 |[1, 5, 3, 7, 9]||2 |[9, 3, 6, 8, 0]|+---+---------------+

您得到以下输出:

+---+---------------+-------+|id |哈希|匹配|+---+---------------+-------+|0 |[1, 2, 3, 4, 5]|[1] ||1 |[1, 5, 3, 7, 9]|[0] ||2 |[9, 3, 6, 8, 0]|[] |+---+---------------+-------+

限制

join 是笛卡尔积,非常昂贵.虽然条件过滤结果,但它会导致在大数据集上进行大量的计算/shuffle,并且性能可能很差.

如果你使用的是3.0之前版本的Spark,则需要替换一些内置火花函数 用户自定义函数

I have a Spark Dataframe with columns id and hashes, where the column hashes contains a Seq of integer values of length n. Example:

+----+--------------------+
+  id|              hashes|
+----+--------------------+
|0   |     [1, 2, 3, 4, 5]|
|1   |     [1, 5, 3, 7, 9]|
|2   |     [9, 3, 6, 8, 0]|
+-------------------------+

I want to get a dataframe with all the rows for which the arrays in hashes match in at least one position. More formally, I want a dataframe with an additional column matches that for each row r contains a Seq of ids of rows where hashes[r][i] == hashes[k][i] with k being any other row for at leas one value of i.

For my example data, the result would be:

+---+---------------+-------+
|id |hashes         |matches|
+---+---------------+-------+
|0  |[1, 2, 3, 4, 5]|[1]    |
|1  |[1, 5, 3, 7, 9]|[0]    |
|2  |[9, 3, 6, 8, 0]|[]     |
+---+---------------+-------+

解决方案

In Spark 3, the following code compares arrays between rows, keeping only rows where the two arrays share at least one element at the same position. df is your input dataframe:

    df.join(
      df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
      exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
    )
      .groupBy("id")
      .agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
      .withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))

Detailed description

First, we perform an auto cross join, filtered by your condition of at least one element in same position on the two hashes arrays.

To build the condition, we zip the two hashes arrays, one from first dataframe, one for the second joined dataframe, that is just the first dataframe with columns renamed. By zipping, we get an array of {"hashes":x, "hashes2":y} and next we just need to check that in this array exists an element where x = y. The complete condition is written as follow:

exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))

Then, we will aggregate by column id to collect all id2 of rows that were kept, meaning rows that matching your condition

to keep the "hashes" column, as for two rows with the same "id" the column "hashes" are equals, we get the first occurrence of "hashes" for each "id". And we collect all the "id2" using collect_list:

.agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))

And finally, we filter out from column "matches" the id of the current row

.withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))

if you need the "id" to be in order, you can add an orderBy clause:

.orderBy("id")

Run

With a dataframe df containing the following values:

+---+---------------+
|id |hashes         |
+---+---------------+
|0  |[1, 2, 3, 4, 5]|
|1  |[1, 5, 3, 7, 9]|
|2  |[9, 3, 6, 8, 0]|
+---+---------------+

You get the following output:

+---+---------------+-------+
|id |hashes         |matches|
+---+---------------+-------+
|0  |[1, 2, 3, 4, 5]|[1]    |
|1  |[1, 5, 3, 7, 9]|[0]    |
|2  |[9, 3, 6, 8, 0]|[]     |
+---+---------------+-------+

Limits

The join is a cartesian product, which is very expensive. Although the condition filters results, it can lead to an huge amount of calculation/shuffle on big datasets, and may have very poor performance.

If you use Spark whose version is before 3.0, you have to replace some build-in spark functions by user-defined functions

这篇关于Spark获取列中数组中具有相同值的所有行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆