在 pyspark 中实现递归算法以查找数据帧内的配对 [英] Implementing a recursive algorithm in pyspark to find pairings within a dataframe

查看:41
本文介绍了在 pyspark 中实现递归算法以查找数据帧内的配对的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spark 数据框 (prof_student_df),其中列出了学生/教授对的时间戳.每个时间戳有 4 位教授和 4 位学生,每个教授-学生对都有一个分数"(因此每个时间段有 16 行).对于每个时间范围,我需要找到教授/学生之间的一对一配对,以最大限度地提高总分.每个教授只能在一个时间范围内与一名学生配对.

I have a spark dataframe (prof_student_df) that lists student/professor pair for a timestamp. There are 4 professors and 4 students for each timestamp and each professor-student pair has a "score" (so there are 16 rows per time frame). For each time frame, I need to find the one to one pairing between professors/students that maximizes the overall score. Each professor can only be matched with one student for a single time frame.

例如,这是一个时间范围内的配对/得分.

For example, here are the pairings/scores for one time frame.

+------------+--------------+------------+-------+----------+
|    time    | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1           | s1         |   0.7 | FALSE    |
| 1596048041 | p1           | s2         |   0.5 | TRUE     |
| 1596048041 | p1           | s3         |   0.3 | FALSE    |
| 1596048041 | p1           | s4         |   0.2 | FALSE    |
| 1596048041 | p2           | s1         |   0.9 | TRUE     |
| 1596048041 | p2           | s2         |   0.1 | FALSE    |
| 1596048041 | p2           | s3         |  0.15 | FALSE    |
| 1596048041 | p2           | s4         |   0.2 | FALSE    |
| 1596048041 | p3           | s1         |   0.2 | FALSE    |
| 1596048041 | p3           | s2         |   0.3 | FALSE    |
| 1596048041 | p3           | s3         |   0.4 | FALSE    |
| 1596048041 | p3           | s4         |   0.8 | TRUE     |
| 1596048041 | p4           | s1         |   0.2 | FALSE    |
| 1596048041 | p4           | s2         |   0.3 | FALSE    |
| 1596048041 | p4           | s3         |  0.35 | TRUE     |
| 1596048041 | p4           | s4         |   0.4 | FALSE    |
+------------+--------------+------------+-------+----------+

目标是获得这个 is_match 列.它可以是布尔值或 0/1 位或任何有效的值.

The goal Is to get this is_match column. It can be a boolean or a 0/1 bit or whatever works.

在上面的例子中,p1 与 s2 匹配,p2 与 s1 匹配,p3 与 s4 匹配,p4 与 s3 匹配,因为这是使总分最大化的组合(产生 2.55 的分数).有一种奇怪的边缘情况——在给定的时间范围内,教授或学生可能少于 4 人.如果有 4 位教授和 3 位学生,那么 1 位教授将没有配对,并且他的所有 is_match 都是假的.同样,如果有 3 个教授和 4 个学生,那么 1 个学生将没有配对,并且他的所有 is_match 都是假的.

In the above example, p1 matched with s2, p2 matched with s1, p3 matched with s4 and p4 matched with s3 because that is the combination that maximized the total score (yields a score of 2.55). There is one weird edge case - it is possible to have LESS than 4 professors or students for a given time frame. If there are 4 professors and 3 students then 1 professor would be without a pairing and all of his is_match would be false. Similarly, if there are 3 professors and 4 students, 1 student would be without a pairing and all of his is_match would be false.

有谁知道我如何做到这一点?我想我会按时间分区或分组,然后将数据输入到一些 UDF 中,该 UDF 会吐出配对,然后我可能不得不将其加入到原始行中(尽管我不确定).我正在尝试在 pyspark 中实现此逻辑,并且可以使用 spark sql/sql 或 pyspark.

Does anyone know how I might accomplish this? i am thinking I would partition or group by time and then feed the data into some UDF that spits out the pairings and then maybe I would have to join that back to the original rows (although I am not sure). I am trying to implement this logic in pyspark and can use spark sql/sql or pyspark.

理想情况下,我希望这尽可能高效,因为会有数百万行.在问题中,我提到了递归算法,因为这是一个传统的递归类型问题,但如果有不使用递归的更快解决方案,我对此持开放态度.

Ideally, I would like this to be as efficient as possible as there will be millions of rows. In the question, I mentioned a recursive algorithm because this is a traditional recursive type problem, but if there is a quicker solution that doesn't use recursion I am open to that.

非常感谢,我是 Spark 的新手,并且对如何做到这一点感到有些困惑.

many thanks, I am new to spark and a little stumped with how to do this.

澄清问题,因为我在示例中意识到我没有指定这一点一天,最多有 14 位教授和 14 位学生可供选择.我一次只看一天,这就是为什么我在数据框中没有日期的原因.在任何一个时间范围内,最多有 4 位教授和 4 位学生.此数据框仅显示一个时间范围.但是在下一个时间范围内,这 4 位教授可能是 p5p1p7p9 或类似的东西.学生可能仍然是 s1s2s3s4.

clarifying the question as I realize in my example I did not specify this for a single day, there will be up to 14 professors and 14 students to choose from. I am just looking at one day at a time which is why I didnt have the date in the dataframe. at any one time frame, there is at most 4 professors and 4 students. this dataframe just shows one time frame. but for the next time frame it is possible that the 4 professors are p5, p1, p7, p9 or something like that. the students might still be s1, s2, s3, s4.

推荐答案

如评论中所述,为了解决您更新中提到的问题,我们可以将每次 student_id 转换为广义序列-id 使用dense_rank,通过第1 步到第3 步(使用student 列),然后使用join 在每个时间student 转换回原来的student_idem>.请参阅下面的步骤 0步骤 4.如果 timeUnit 中的教授少于 4 个,则 Numpy-end 中的维度将调整为 4(使用 np_vstack() 和 np_zeros()),请参阅更新的函数 find_assigned.

As discussed in comments, to fix the issue mentioned in your update, we can convert student_id at each time into generalized sequence-id using dense_rank, go through Step 1 to 3 (using student column) and then use join to convert student at each time back to their original student_id. see below Step-0 and Step-4. in case there are less than 4 professors in a timeUnit, dimension will be resize to 4 in Numpy-end (using np_vstack() and np_zeros()), see the updated function find_assigned.

你可以试试pandas_udfscipy.optimize.linear_sum_assignment(注意:后端方法是@cronoik 在主要评论中提到的匈牙利算法),见下文:

You can try pandas_udf and scipy.optimize.linear_sum_assignment(note: the backend method is the Hungarian algorithm as mentioned by @cronoik in the main comments), see below:

from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np

df = spark.createDataFrame([
    ('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),
    ('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),
    ('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),
    ('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),
    ('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),
    ('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])

N = 4
cols_student = [*range(1,N+1)]

Step-0:添加一个额外的列student,并使用time + 的所有唯一组合创建一个新的数据帧df3student_id + student.

Step-0: add an extra column student, and create a new dataframe df3 with all unique combos of time + student_id + student.

w1 = Window.partitionBy('time').orderBy('student_id')

df = df.withColumn('student', dense_rank().over(w1))
+----------+------------+----------+-----+-------+                              
|      time|professor_id|student_id|score|student|
+----------+------------+----------+-----+-------+
|1596048041|          p1|        s1|  0.7|      1|
|1596048041|          p2|        s1|  0.9|      1|
|1596048041|          p3|        s1|  0.2|      1|
|1596048041|          p4|        s1|  0.2|      1|
|1596048041|          p1|        s2|  0.5|      2|
|1596048041|          p2|        s2|  0.1|      2|
|1596048041|          p3|        s2|  0.3|      2|
|1596048041|          p4|        s2|  0.3|      2|
|1596048041|          p1|        s3|  0.3|      3|
|1596048041|          p2|        s3| 0.15|      3|
|1596048041|          p3|        s3|  0.4|      3|
|1596048041|          p4|        s3| 0.35|      3|
|1596048041|          p1|        s4|  0.2|      4|
|1596048041|          p2|        s4|  0.2|      4|
|1596048041|          p3|        s4|  0.8|      4|
|1596048041|          p4|        s4|  0.4|      4|
+----------+------------+----------+-----+-------+

df3 = df.select('time','student_id','student').dropDuplicates()
+----------+----------+-------+                                                 
|      time|student_id|student|
+----------+----------+-------+
|1596048041|        s1|      1|
|1596048041|        s2|      2|
|1596048041|        s3|      3|
|1596048041|        s4|      4|
+----------+----------+-------+

第 1 步: 使用pivot 找到教授与学生的矩阵,注意我们将分数的负数设置为pivot 的值,以便我们可以使用scipy.optimize.linear_sum_assignment 来找到最小值分配问题的成本:

Step-1: use pivot to find the matrix of professors vs students, notice we set negative of scores to the values of pivot so that we can use scipy.optimize.linear_sum_assignment to find the min cost of an assignment problem:

df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
+----------+------------+----+----+-----+----+
|      time|professor_id|   1|   2|    3|   4|
+----------+------------+----+----+-----+----+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|
+----------+------------+----+----+-----+----+

第 2 步: 使用 pandas_udf 和 scipy.optimize.linear_sum_assignment 获取列索引,然后将相应的列名分配给新列 assigned:

Step-2: use pandas_udf and scipy.optimize.linear_sum_assignment to get column indices and then assign the corresponding column name to a new column assigned:

# returnSchema contains one more StringType column `assigned` than schema from the input pdf:
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')

# since the # of students are always N, we can use np.vstack to set the N*N matrix
# below `n` is the number of professors/rows in pdf
# sz is the size of input Matrix, sz=4 in this example
def __find_assigned(pdf, sz):
  cols = pdf.columns[2:]
  n = pdf.shape[0]
  n1 = pdf.iloc[:,2:].fillna(0).values
  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))
  return pdf.assign(assigned=[cols[i] for i in idx][:n])

find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)

df2 = df1.groupby('time').apply(find_assigned)
+----------+------------+----+----+-----+----+--------+
|      time|professor_id|   1|   2|    3|   4|assigned|
+----------+------------+----+----+-----+----+--------+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|       3|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|       1|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|       2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|       4|
+----------+------------+----+----+-----+----+--------+

注意:根据来自 @OluwafemiSule 的建议,我们可以使用参数 maximize 而不是否定分数值.此参数可用 SciPy 1.4.0+:

Note: per suggestion from @OluwafemiSule, we can use the parameter maximize instead of negate the score values. this parameter is available SciPy 1.4.0+:

  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)

步骤 3: 使用 SparkSQL stack 函数对上述 df2 进行归一化,取反分数值并过滤分数为 NULL 的行.所需的 is_match 列应该有 assigned==student:

Step-3: use SparkSQL stack function to normalize the above df2, negate the score values and filter rows with score is NULL. the desired is_match column should have assigned==student:

df_new = df2.selectExpr(
  'time',
  'professor_id',
  'assigned',
  'stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))

df_new.show()
+----------+------------+--------+-------+-----+--------+
|      time|professor_id|assigned|student|score|is_match|
+----------+------------+--------+-------+-----+--------+
|1596048041|          p4|       3|      1|  0.2|   false|
|1596048041|          p4|       3|      2|  0.3|   false|
|1596048041|          p4|       3|      3| 0.35|    true|
|1596048041|          p4|       3|      4|  0.4|   false|
|1596048041|          p2|       1|      1|  0.9|    true|
|1596048041|          p2|       1|      2|  0.1|   false|
|1596048041|          p2|       1|      3| 0.15|   false|
|1596048041|          p2|       1|      4|  0.2|   false|
|1596048041|          p1|       2|      1|  0.7|   false|
|1596048041|          p1|       2|      2|  0.5|    true|
|1596048041|          p1|       2|      3|  0.3|   false|
|1596048041|          p1|       2|      4|  0.2|   false|
|1596048041|          p3|       4|      1|  0.2|   false|
|1596048041|          p3|       4|      2|  0.3|   false|
|1596048041|          p3|       4|      3|  0.4|   false|
|1596048041|          p3|       4|      4|  0.8|    true|
+----------+------------+--------+-------+-----+--------+

第 4 步:使用 join 将 student 转换回 student_id(如果可能,使用广播 join):

Step-4: use join to convert student back to student_id (use broadcast join if possible):

df_new = df_new.join(df3, on=["time", "student"])
+----------+-------+------------+--------+-----+--------+----------+            
|      time|student|professor_id|assigned|score|is_match|student_id|
+----------+-------+------------+--------+-----+--------+----------+
|1596048041|      1|          p1|       2|  0.7|   false|        s1|
|1596048041|      2|          p1|       2|  0.5|    true|        s2|
|1596048041|      3|          p1|       2|  0.3|   false|        s3|
|1596048041|      4|          p1|       2|  0.2|   false|        s4|
|1596048041|      1|          p2|       1|  0.9|    true|        s1|
|1596048041|      2|          p2|       1|  0.1|   false|        s2|
|1596048041|      3|          p2|       1| 0.15|   false|        s3|
|1596048041|      4|          p2|       1|  0.2|   false|        s4|
|1596048041|      1|          p3|       4|  0.2|   false|        s1|
|1596048041|      2|          p3|       4|  0.3|   false|        s2|
|1596048041|      3|          p3|       4|  0.4|   false|        s3|
|1596048041|      4|          p3|       4|  0.8|    true|        s4|
|1596048041|      1|          p4|       3|  0.2|   false|        s1|
|1596048041|      2|          p4|       3|  0.3|   false|        s2|
|1596048041|      3|          p4|       3| 0.35|    true|        s3|
|1596048041|      4|          p4|       3|  0.4|   false|        s4|
+----------+-------+------------+--------+-----+--------+----------+

df_new = df_new.drop("student", "assigned")

这篇关于在 pyspark 中实现递归算法以查找数据帧内的配对的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆