在 PySpark 中根据其他数据框的列值创建指标数组 [英] Creating an indicator array based on other data frame's column values in PySpark

查看:72
本文介绍了在 PySpark 中根据其他数据框的列值创建指标数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框:df1

+---+-----------------+
|id1|           items1|
+---+-----------------+
|  0|     [B, C, D, E]|
|  1|        [E, A, C]|
|  2|     [F, A, E, B]|
|  3|        [E, G, A]|
|  4|  [A, C, E, B, D]|
+---+-----------------+ 

df2:

+---+-----------------+
|id2|           items2|
+---+-----------------+
|001|           [A, C]|
|002|              [D]|
|003|        [E, A, B]|
|004|        [B, D, C]|
|005|           [F, B]|
|006|           [G, E]|
+---+-----------------+ 

我想根据 items2 中的值创建一个指标向量(在 df1 的新列 result_array 中).向量的长度应该与 df2 中的行数相同(在这个例子中它应该有 6 个元素).如果 items1 中的行包含 items2 对应行中的所有元素,则其元素的值应为 1.0,否则值为 0.0.结果应如下所示:

I would like to create an indicator vector (in a new column result_array in df1) based on values in items2. The vector should be of the same length as number of rows in df2 (in this example it should have 6 elements). Its elements should have either value of 1.0 if the row in items1 contains all the elements in the corresponding row of items2, or value 0.0 otherwise. The result should look as follows:

+---+-----------------+-------------------------+
|id1|           items1|             result_array|
+---+-----------------+-------------------------+
|  0|     [B, C, D, E]|[0.0,1.0,0.0,1.0,0.0,0.0]|
|  1|        [E, A, C]|[1.0,0.0,0.0,0.0,0.0,0.0]|
|  2|     [F, A, E, B]|[0.0,0.0,1.0,0.0,1.0,0.0]|
|  3|        [E, G, A]|[0.0,0.0,0.0,0.0,0.0,1.0]|
|  4|  [A, C, E, B, D]|[1.0,1.0,1.0,1.0,0.0,0.0]|
+---+-----------------+-------------------------+

例如,在第 0 行,第二个值为 1.0,因为 [D] 是 [B, C, D, E] 的子集,第四个值为 1.0,因为 [B, D, C] 是 [B, D, C] 的子集[B、C、D、E].df2 中的所有其他项组都不是 [B, C, D, E] 的子集,因此它们的指标值为 0.0.

For example, in row 0, the second value is 1.0 because [D] is a subset of [B, C, D, E] and the fourth value is 1.0 because [B, D, C] is a subset of [B, C, D, E]. All other item groups in df2 are not subsets of [B, C, D, E], thus their indicator values are 0.0.

我尝试使用 collect() 创建 items2 中所有项目组的列表,然后应用 udf,但我的数据太大(超过 1000 万行).

I've tried to create a list of all item groups in items2 using collect() and then apply a udf but my data is too large (over 10 million rows).

推荐答案

你可以这样进行,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df1 = sql.createDataFrame([
     (0,['B', 'C', 'D', 'E']),
     (1,['E', 'A', 'C']),
     (2,['F', 'A', 'E', 'B']),
     (3,['E', 'G', 'A']),
     (4,['A', 'C', 'E', 'B', 'D'])],
   ['id1','items1'])

df2 = sql.createDataFrame([
     (001,['A', 'C']),
     (002,['D']),
     (003,['E', 'A', 'B']),
     (004,['B', 'D', 'C']),
     (005,['F', 'B']),
     (006,['G', 'E'])],
    ['id2','items2'])

为您提供数据帧,

+---+---------------+
|id1|         items1|
+---+---------------+
|  0|   [B, C, D, E]|
|  1|      [E, A, C]|
|  2|   [F, A, E, B]|
|  3|      [E, G, A]|
|  4|[A, C, E, B, D]|
+---+---------------+

+---+---------+
|id2|   items2|
+---+---------+
|  1|   [A, C]|
|  2|      [D]|
|  3|[E, A, B]|
|  4|[B, D, C]|
|  5|   [F, B]|
|  6|   [G, E]|
+---+---------+

现在,crossJoin 两个数据帧,它为您提供 df1df2 的笛卡尔积.然后,'items1' 上的 groupby 并应用 udf 来获得 'result_array'.

Now, crossJoin the two dataframes, which gives you the cartesian product of df1 with df2. Then, groupby on 'items1' and apply a udf to get the 'result_array'.

get_array_udf = F.udf(lambda x,y:[1.0 if set(z) < set(x) else 0.0 for z in y], ArrayType(FloatType()))

df = df1.crossJoin(df2)\
        .groupby(['id1', 'items1']).agg(F.collect_list('items2').alias('items2'))\
        .withColumn('result_array', get_array_udf('items1', 'items2')).drop('items2')

df.show()

这给你输出,

+---+---------------+------------------------------+                            
|id1|items1         |result_array                  |
+---+---------------+------------------------------+
|1  |[E, A, C]      |[1.0, 0.0, 0.0, 0.0, 0.0, 0.0]|
|0  |[B, C, D, E]   |[0.0, 1.0, 0.0, 1.0, 0.0, 0.0]|
|4  |[A, C, E, B, D]|[1.0, 1.0, 1.0, 1.0, 0.0, 0.0]|
|3  |[E, G, A]      |[0.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
|2  |[F, A, E, B]   |[0.0, 0.0, 1.0, 0.0, 1.0, 0.0]|
+---+---------------+------------------------------+

这篇关于在 PySpark 中根据其他数据框的列值创建指标数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆