为Spark数据框中的每个组创建索引 [英] Creating indices for each group in Spark dataframe

查看:93
本文介绍了为Spark数据框中的每个组创建索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark中有一个数据帧,其中有2列group_idvalue,其中value是双精度型.我想基于group_id对数据进行分组,按value对每个组进行排序,然后添加第三列index,该列代表value在该组的值顺序中的位置.

例如,考虑以下输入数据:

+--------+-----+
|group_id|value|
+--------+-----+
|1       |1.3  |
|2       |0.8  |
|1       |3.4  |
|1       |-1.7 |
|2       |2.3  |
|2       |5.9  |
|1       |2.7  |
|1       |0.0  |
+--------+-----+

输出将类似于

+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|1       |-1.7 |1    |
|1       |0.0  |2    |
|1       |1.3  |3    |
|1       |2.7  |4    |
|1       |3.4  |5    |
|2       |0.8  |1    |
|2       |2.3  |2    |
|2       |5.9  |3    |
+--------+-----+-----+

如果索引是从0开始并且排序是升序还是降序,这都不重要.

作为后续措施,请考虑以下情况:原始数据中的第三列extra对于某些(group_id, value)组合采用多个值.一个例子是:

+--------+-----+-----+
|group_id|value|extra|
+--------+-----+-----+
|1       |1.3  |1    |
|1       |1.3  |2    |
|2       |0.8  |1    |
|1       |3.4  |1    |
|1       |3.4  |2    |
|1       |3.4  |3    |
|1       |-1.7 |1    |
|2       |2.3  |1    |
|2       |5.9  |1    |
|1       |2.7  |1    |
|1       |0.0  |1    |
+--------+-----+-----+

是否有一种添加index列的方法,使得不考虑extra列但仍将其保留?在这种情况下的输出将是

+--------+-----+-----+-----+
|group_id|value|extra|index|
+--------+-----+-----+-----+
|1       |-1.7 |1    |1    |
|1       |0.0  |1    |2    |
|1       |1.3  |1    |3    |
|1       |1.3  |2    |3    |
|1       |2.7  |1    |4    |
|1       |3.4  |1    |5    |
|1       |3.4  |2    |5    |
|1       |3.4  |3    |5    |
|2       |0.8  |1    |1    |
|2       |2.3  |1    |2    |
|2       |5.9  |1    |3    |
+--------+-----+-----+-----+

我知道可以通过复制数据,删除extra列来实现此目的

  1. 复制数据
  2. 删除extra
  3. 执行distinct操作,这将导致原始示例中的数据
  4. 使用原始解决方案计算index
  5. 将结果与第二个示例中的数据结合起来

但是,这将涉及很多额外的计算和开销.

解决方案

您可以使用Window函数基于value创建由group_id分区的排名列:

 from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank
# Define window
window = Window.partitionBy(df['group_id']).orderBy(df['value'])
# Create column
df.select('*', rank().over(window).alias('index')).show()
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|       1| -1.7|    1|
|       1|  0.0|    2|
|       1|  1.3|    3|
|       1|  2.7|    4|
|       1|  3.4|    5|
|       2|  0.8|    1|
|       2|  2.3|    2|
|       2|  5.9|    3|
+--------+-----+-----+
 

因为,首先选择'*',所以也使用上面的代码保留所有其他变量.但是,第二个示例显示您正在寻找函数dense_rank(),该函数给出的排名列没有空格:

 df.select('*', dense_rank().over(window).alias('index'))
 

I have a dataframe in Spark with 2 columns, group_id and value, where value is a double. I would like to group the data based on the group_id, order each group by value, and then add a third column index that represents the position of value in the ordering of values for the group.

For example, considering the following input data:

+--------+-----+
|group_id|value|
+--------+-----+
|1       |1.3  |
|2       |0.8  |
|1       |3.4  |
|1       |-1.7 |
|2       |2.3  |
|2       |5.9  |
|1       |2.7  |
|1       |0.0  |
+--------+-----+

The output would then be something like

+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|1       |-1.7 |1    |
|1       |0.0  |2    |
|1       |1.3  |3    |
|1       |2.7  |4    |
|1       |3.4  |5    |
|2       |0.8  |1    |
|2       |2.3  |2    |
|2       |5.9  |3    |
+--------+-----+-----+

It is unimportant if the index is 0-based and whether the sort is ascending or descending.

As a follow-up, consider the case where there is a third column, extra, in the original data that takes on multiple values for some (group_id, value) combinations. An example is:

+--------+-----+-----+
|group_id|value|extra|
+--------+-----+-----+
|1       |1.3  |1    |
|1       |1.3  |2    |
|2       |0.8  |1    |
|1       |3.4  |1    |
|1       |3.4  |2    |
|1       |3.4  |3    |
|1       |-1.7 |1    |
|2       |2.3  |1    |
|2       |5.9  |1    |
|1       |2.7  |1    |
|1       |0.0  |1    |
+--------+-----+-----+

Is there a way to add an index column such that the extra column is not considered but still kept? The output in this case would be

+--------+-----+-----+-----+
|group_id|value|extra|index|
+--------+-----+-----+-----+
|1       |-1.7 |1    |1    |
|1       |0.0  |1    |2    |
|1       |1.3  |1    |3    |
|1       |1.3  |2    |3    |
|1       |2.7  |1    |4    |
|1       |3.4  |1    |5    |
|1       |3.4  |2    |5    |
|1       |3.4  |3    |5    |
|2       |0.8  |1    |1    |
|2       |2.3  |1    |2    |
|2       |5.9  |1    |3    |
+--------+-----+-----+-----+

I know that it is possible to do this by duplicating the data, dropping the extra column

  1. Duplicating the data
  2. Dropping the extra column
  3. Performing a distinct operation, which would result in data in the original example
  4. Compute the index column using the original solution
  5. Join the result with the data from the second example

However, this would involve a lot of extra computation and overhead.

解决方案

You can use Window functions to create a rank column based on value, partitioned by group_id:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank
# Define window
window = Window.partitionBy(df['group_id']).orderBy(df['value'])
# Create column
df.select('*', rank().over(window).alias('index')).show()
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|       1| -1.7|    1|
|       1|  0.0|    2|
|       1|  1.3|    3|
|       1|  2.7|    4|
|       1|  3.4|    5|
|       2|  0.8|    1|
|       2|  2.3|    2|
|       2|  5.9|    3|
+--------+-----+-----+

Because, you first select '*', you keep all other variables using the above code as well. However, your second example shows that you are looking for the function dense_rank(), which gives as a rank column with no gaps:

df.select('*', dense_rank().over(window).alias('index'))

这篇关于为Spark数据框中的每个组创建索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆