在 PySpark 中根据其他数据框的列值创建指标数组 [英] Creating an indicator array based on other data frame's column values in PySpark
问题描述
我有两个数据框:df1
+---+-----------------+
|id1| items1|
+---+-----------------+
| 0| [B, C, D, E]|
| 1| [E, A, C]|
| 2| [F, A, E, B]|
| 3| [E, G, A]|
| 4| [A, C, E, B, D]|
+---+-----------------+
和df2
:
+---+-----------------+
|id2| items2|
+---+-----------------+
|001| [A, C]|
|002| [D]|
|003| [E, A, B]|
|004| [B, D, C]|
|005| [F, B]|
|006| [G, E]|
+---+-----------------+
我想根据 items2
中的值创建一个指标向量(在 df1
的新列 result_array
中).向量的长度应该与 df2
中的行数相同(在这个例子中它应该有 6 个元素).如果 items1
中的行包含 items2
对应行中的所有元素,则其元素的值应为 1.0,否则值为 0.0.结果应如下所示:
I would like to create an indicator vector (in a new column result_array
in df1
) based on values in items2
. The vector should be of the same length as number of rows in df2
(in this example it should have 6 elements). Its elements should have either value of 1.0 if the row in items1
contains all the elements in the corresponding row of items2
, or value 0.0 otherwise. The result should look as follows:
+---+-----------------+-------------------------+
|id1| items1| result_array|
+---+-----------------+-------------------------+
| 0| [B, C, D, E]|[0.0,1.0,0.0,1.0,0.0,0.0]|
| 1| [E, A, C]|[1.0,0.0,0.0,0.0,0.0,0.0]|
| 2| [F, A, E, B]|[0.0,0.0,1.0,0.0,1.0,0.0]|
| 3| [E, G, A]|[0.0,0.0,0.0,0.0,0.0,1.0]|
| 4| [A, C, E, B, D]|[1.0,1.0,1.0,1.0,0.0,0.0]|
+---+-----------------+-------------------------+
例如,在第 0 行,第二个值为 1.0,因为 [D] 是 [B, C, D, E] 的子集,第四个值为 1.0,因为 [B, D, C] 是 [B, D, C] 的子集[B、C、D、E].df2
中的所有其他项组都不是 [B, C, D, E] 的子集,因此它们的指标值为 0.0.
For example, in row 0, the second value is 1.0 because [D] is a subset of [B, C, D, E] and the fourth value is 1.0 because [B, D, C] is a subset of [B, C, D, E]. All other item groups in df2
are not subsets of [B, C, D, E], thus their indicator values are 0.0.
我尝试使用 collect() 创建 items2
中所有项目组的列表,然后应用 udf,但我的数据太大(超过 1000 万行).
I've tried to create a list of all item groups in items2
using collect() and then apply a udf but my data is too large (over 10 million rows).
推荐答案
你可以这样进行,
import pyspark.sql.functions as F
from pyspark.sql.types import *
df1 = sql.createDataFrame([
(0,['B', 'C', 'D', 'E']),
(1,['E', 'A', 'C']),
(2,['F', 'A', 'E', 'B']),
(3,['E', 'G', 'A']),
(4,['A', 'C', 'E', 'B', 'D'])],
['id1','items1'])
df2 = sql.createDataFrame([
(001,['A', 'C']),
(002,['D']),
(003,['E', 'A', 'B']),
(004,['B', 'D', 'C']),
(005,['F', 'B']),
(006,['G', 'E'])],
['id2','items2'])
为您提供数据帧,
+---+---------------+
|id1| items1|
+---+---------------+
| 0| [B, C, D, E]|
| 1| [E, A, C]|
| 2| [F, A, E, B]|
| 3| [E, G, A]|
| 4|[A, C, E, B, D]|
+---+---------------+
+---+---------+
|id2| items2|
+---+---------+
| 1| [A, C]|
| 2| [D]|
| 3|[E, A, B]|
| 4|[B, D, C]|
| 5| [F, B]|
| 6| [G, E]|
+---+---------+
现在,crossJoin
两个数据帧,它为您提供 df1
与 df2
的笛卡尔积.然后,'items1'
上的 groupby
并应用 udf
来获得 'result_array'
.
Now, crossJoin
the two dataframes, which gives you the cartesian product of df1
with df2
. Then, groupby
on 'items1'
and apply a udf
to get the 'result_array'
.
get_array_udf = F.udf(lambda x,y:[1.0 if set(z) < set(x) else 0.0 for z in y], ArrayType(FloatType()))
df = df1.crossJoin(df2)\
.groupby(['id1', 'items1']).agg(F.collect_list('items2').alias('items2'))\
.withColumn('result_array', get_array_udf('items1', 'items2')).drop('items2')
df.show()
这给你输出,
+---+---------------+------------------------------+
|id1|items1 |result_array |
+---+---------------+------------------------------+
|1 |[E, A, C] |[1.0, 0.0, 0.0, 0.0, 0.0, 0.0]|
|0 |[B, C, D, E] |[0.0, 1.0, 0.0, 1.0, 0.0, 0.0]|
|4 |[A, C, E, B, D]|[1.0, 1.0, 1.0, 1.0, 0.0, 0.0]|
|3 |[E, G, A] |[0.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
|2 |[F, A, E, B] |[0.0, 0.0, 1.0, 0.0, 1.0, 0.0]|
+---+---------------+------------------------------+
这篇关于在 PySpark 中根据其他数据框的列值创建指标数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!