Pyspark-保持联系的排名列 [英] Pyspark - Ranking columns keeping ties

查看:106
本文介绍了Pyspark-保持联系的排名列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种对数据框的保留列进行排序的方法.专门针对此示例,我有一个pyspark数据框,如下所示,其中我想生成colA& amp;的等级. colB(尽管我希望支持能够对N个列进行排名)

I'm looking for a way to rank columns of a dataframe preserving ties. Specifically for this example, I have a pyspark dataframe as follows where I want to generate ranks for colA & colB (though I want to support being able to rank N number of columns)

 +--------+----------+-----+----+
 |  Entity|        id| colA|colB|
 +-------------------+-----+----+
 |       a|8589934652|   21|  50|
 |       b|       112|    9|  23|
 |       c|8589934629|    9|  23|
 |       d|8589934702|    8|  21|         
 |       e|        20|    2|  21|        
 |       f|8589934657|    2|   5|          
 |       g|8589934601|    1|   5|         
 |       h|8589934653|    1|   4|          
 |       i|8589934620|    0|   4|          
 |       j|8589934643|    0|   3|         
 |       k|8589934618|    0|   3|         
 |       l|8589934602|    0|   2|         
 |       m|8589934664|    0|   2|         
 |       n|        25|    0|   1|         
 |       o|        67|    0|   1|         
 |       p|8589934642|    0|   1|         
 |       q|8589934709|    0|   1|         
 |       r|8589934660|    0|   1|         
 |       s|        30|    0|   1|         
 |       t|        55|    0|   1|         
 +--------+----------+-----+----+

我想要的是一种对数据框进行排名的方法,其中绑定值会获得相同的排名,例如:

What I'd like is a way to rank this dataframe where tied values receive the same rank such as:

 +--------+----------+-----+----+---------+---------+
 |  Entity|        id| colA|colB|colA_rank|colB_rank|
 +-------------------+-----+----+---------+---------+
 |       a|8589934652|   21|  50|        1|        1|
 |       b|       112|    9|  23|        2|        2|
 |       c|8589934629|    9|  21|        2|        3|
 |       d|8589934702|    8|  21|        3|        3|        
 |       e|        20|    2|  21|        4|        3|      
 |       f|8589934657|    2|   5|        4|        4|       
 |       g|8589934601|    1|   5|        5|        4|     
 |       h|8589934653|    1|   4|        5|        5|     
 |       i|8589934620|    0|   4|        6|        5|    
 |       j|8589934643|    0|   3|        6|        6|  
 |       k|8589934618|    0|   3|        6|        6| 
 |       l|8589934602|    0|   2|        6|        7|
 |       m|8589934664|    0|   2|        6|        7|
 |       n|        25|    0|   1|        6|        8|
 |       o|        67|    0|   1|        6|        8|
 |       p|8589934642|    0|   1|        6|        8|
 |       q|8589934709|    0|   1|        6|        8|
 |       r|8589934660|    0|   1|        6|        8|
 |       s|        30|    0|   1|        6|        8|
 |       t|        55|    0|   1|        6|        8|
 +--------+----------+-----+----+---------+---------+

我当前使用第一个数据帧的实现如下:

My current implementation with the first dataframe looks like:

 def getRanks(mydf, cols=None, ascending=False):
     from pyspark import Row
     # This takes a dataframe and a list of columns to rank
     # If no list is provided, it ranks *all* columns
     # returns a new dataframe

     def addRank(ranked_rdd, col, ascending):
         # This assumes an RDD of the form (Row(...), list[...])
         # it orders the rdd by col, finds the order, then adds that to the 
         # list
         myrdd = ranked_rdd.sortBy(lambda (row, ranks):  row[col], 
                 ascending=ascending).zipWithIndex()
         return myrdd.map(lambda ((row, ranks), index): (row, ranks + 
                [index+1]))

     myrdd = mydf.rdd
     fields = myrdd.first().__fields__
     ranked_rdd = myrdd.map(lambda x: (x, []))

     if (cols is None):
         cols = fields
     for col in cols:
         ranked_rdd = addRank(ranked_rdd, col, ascending)
     rank_names = [x + "_rank" for x in cols]

     # Hack to make sure columns come back in the right order
     ranked_rdd = ranked_rdd.map(lambda (row, ranks): Row(*row.__fields__ + 
                  rank_names)(*row + tuple(ranks)))
     return ranked_rdd.toDF()

产生:

 +--------+----------+-----+----+---------+---------+
 |  Entity|        id| colA|colB|colA_rank|colB_rank|
 +-------------------+-----+----+---------+---------+
 |       a|8589934652|   21|  50|        1|        1|
 |       b|       112|    9|  23|        2|        2|
 |       c|8589934629|    9|  23|        3|        3|
 |       d|8589934702|    8|  21|        4|        4|        
 |       e|        20|    2|  21|        5|        5|      
 |       f|8589934657|    2|   5|        6|        6|       
 |       g|8589934601|    1|   5|        7|        7|     
 |       h|8589934653|    1|   4|        8|        8|     
 |       i|8589934620|    0|   4|        9|        9|    
 |       j|8589934643|    0|   3|       10|       10|  
 |       k|8589934618|    0|   3|       11|       11|
 |       l|8589934602|    0|   2|       12|       12|
 |       m|8589934664|    0|   2|       13|       13|
 |       n|        25|    0|   1|       14|       14|
 |       o|        67|    0|   1|       15|       15|
 |       p|8589934642|    0|   1|       16|       16|
 |       q|8589934709|    0|   1|       17|       17|
 |       r|8589934660|    0|   1|       18|       18|
 |       s|        30|    0|   1|       19|       19|
 |       t|        55|    0|   1|       20|       20|
 +--------+----------+-----+----+---------+---------+

如您所见,函数getRanks()获取一个数据帧,指定要排名的列,对其进行排序,然后使用zipWithIndex()生成排序或排名.但是,我想不出一种保持联系的方法.

As you can see, the function getRanks() takes a dataframe, specifies the columns to be ranked, sorts them, and uses zipWithIndex() to generate an ordering or rank. However, I can't figure out a way to preserve ties.

此stackoverflow帖子是我找到的最接近的解决方案: rank-users-by-column 但是它似乎只能处理1列(我认为).

This stackoverflow post is the closest solution I've found: rank-users-by-column But it appears to only handle 1 column (I think).

非常感谢您的提前帮助!

Thanks so much for the help in advance!

列"id"是通过调用monotonically_increasing_id()生成的,并且在我的实现中被强制转换为字符串.

column 'id' is generated from calling monotonically_increasing_id() and in my implementation is cast to a string.

推荐答案

您正在寻找dense_rank

首先,让我们创建我们的数据框:

First let's create our dataframe:

df = spark.createDataFrame(sc.parallelize([["a",8589934652,21,50],["b",112,9,23],["c",8589934629,9,23],
                ["d",8589934702,8,21],["e",20,2,21],["f",8589934657,2,5],
                ["g",8589934601,1,5],["h",8589934653,1,4],["i",8589934620,0,4],
                ["j",8589934643,0,3],["k",8589934618,0,3],["l",8589934602,0,2],
                ["m",8589934664,0,2],["n",25,0,1],["o",67,0,1],["p",8589934642,0,1],
                ["q",8589934709,0,1],["r",8589934660,0,1],["s",30,0,1],["t",55,0,1]]
), ["Entity","id","colA","colB"])

我们将定义两个windowSpec:

from pyspark.sql import Window
import pyspark.sql.functions as psf
wA = Window.orderBy(psf.desc("colA"))
wB = Window.orderBy(psf.desc("colB"))
df = df.withColumn(
    "colA_rank", 
    psf.dense_rank().over(wA)
).withColumn(
    "colB_rank", 
    psf.dense_rank().over(wB)
)

    +------+----------+----+----+---------+---------+
    |Entity|        id|colA|colB|colA_rank|colB_rank|
    +------+----------+----+----+---------+---------+
    |     a|8589934652|  21|  50|        1|        1|
    |     b|       112|   9|  23|        2|        2|
    |     c|8589934629|   9|  23|        2|        2|
    |     d|8589934702|   8|  21|        3|        3|
    |     e|        20|   2|  21|        4|        3|
    |     f|8589934657|   2|   5|        4|        4|
    |     g|8589934601|   1|   5|        5|        4|
    |     h|8589934653|   1|   4|        5|        5|
    |     i|8589934620|   0|   4|        6|        5|
    |     j|8589934643|   0|   3|        6|        6|
    |     k|8589934618|   0|   3|        6|        6|
    |     l|8589934602|   0|   2|        6|        7|
    |     m|8589934664|   0|   2|        6|        7|
    |     n|        25|   0|   1|        6|        8|
    |     o|        67|   0|   1|        6|        8|
    |     p|8589934642|   0|   1|        6|        8|
    |     q|8589934709|   0|   1|        6|        8|
    |     r|8589934660|   0|   1|        6|        8|
    |     s|        30|   0|   1|        6|        8|
    |     t|        55|   0|   1|        6|        8|
    +------+----------+----+----+---------+---------+

这篇关于Pyspark-保持联系的排名列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆