在Pyspark中的groupedBy对象爆炸后使用Collect_set [英] Using Collect_set after exploding in a groupedBy object in Pyspark

查看:156
本文介绍了在Pyspark中的groupedBy对象爆炸后使用Collect_set的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框架,其格式如下:

  root 
|-docId:字符串(nullable = true)
|-field_a:数组(nullable = true)
| |-元素:字符串(containsNull = true)
|-field_b:数组(nullable = true)
| |-元素:字符串(containsNull = true)

我想执行 field_a 上> groupBy 并使用 collect_set 保留所有不同的值(基本上是内部值在列表中)汇总中的 field_b 中,我不想通过爆炸 field_b 来添加新列,然后然后进行汇总 collect_set



如何使用udaf或pandas udf来实现这一目标?



例如:

  + --------------------- +- -------------- + ------------ + 
| docId | field_b | field_a |
+ --------------------- + ---------------- + ------ ------ +
| k& ;; + B8ROh\\NmetBg = DiR | [IDN,SGP] | [F] |
| k& ;;] ^ nX7HRdjIO`> S1 + | [IND,KWT] | [M] |
| k& h&)8Sd\\JrDVL%VH> N | [IDN,IND] | [M] |
| k&< 8nTqjrYNE8taji ^ $ u | [IND,BHR] | [F] |
| k& = $ M5Hmd6Y>& @’co- ^ 1 | [IND,AUS] | [M] |
| k& pIZ)g ^!L / ht!T\'/ f | [IDN,KWT] | [M] |
| k& @ZX> Ph%rPdZ [ ,Pqsc。| [IND,MYS] | [F] |
| k& A] C> dmDXVN $ hiVEUk / | [IND,PHL] | [F] |
| k& BX1eGhumSQ6`7A8< ; Zd | [IND,SAU] | [M] |
| k& J)2Vo(k * [^ c Mg * f%)| [IND,SGP] | [F] |
+ --------------------- + ---------------- + ------ ------ +

我正在寻找的输出是:

  + ------------ + ------------------- ------------- + 
| field_a | collect_set(field__b)|
+ ------------ + -------------------------------- +
| [F] | [IDN,IND,SGP,BHR,MYS,PHL] |
| [M] | [IND,KWT,IDN,AUS,SAU,KWT] |
+ ------------ + -------------------------------- +


解决方案

我用熊猫为您的问题写了一个解决方案UDF。我不明白为什么您的field_a列(代表性别?)为什么是列表,所以我将其转换为简单的字符串,但是如果需要,可以将其设置为字符串列表。在这里是:



(1)在熊猫中创建虚拟df并生成Spark DataFrame:

 将pandas作为pd 
导入pyspark.sql.functions中的随机
import pandas_udf,PandasUDFType

a_list = ['F','M']
b_list = ['IDN','IND','SGP','BHR','MYS','PHL','AUS','SAU','KWT']
size = 10
dummy_df = pd.DataFrame({'docId':[_在范围(大小)中的_的random.randint(0,100)],
'field_b':[[random.choice(b_list),随机。在范围(大小)中的_的choice(b_list)],
'field_a':[在范围(大小)的_中的random.choice(a_list)]})

df = spark .createDataFrame(dummy_df)

生产:

  + ----- + ------- + ---------- + 
| docId | field_a | field_b |
+ ----- + ------- + ---------- +
| 23 | F | [SAU,SGP] |
| 36 | F | [IDN,PHL] |
| 82 | M | [BHR,SAU] |
| 30 | F | [AUS,IDN] |
| 75 | F | [AUS,MYS] |
| 46 | F | [SAU,IDN] |
| 11 | F | [SAU,BHR] |
| 71 | M | [KWT,IDN] |
| 50 | F | [IND,SGP] |
| 78 | F | [IND,SGP] |
+ ----- + ------- + ---------- +

(2)然后定义熊猫UDF,分组并应用:

  @pandas_udf('field_a string,set_field_b array< string>',PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
unique_values = pd.DataFrame(df ['field_b']。values.tolist())。stack() .unique()。tolist()
返回pd.DataFrame({'field_a':df ['field_a']。iloc [0],'set_field_b':[unique_values]})

结果= df.groupby('field_a')。apply(my_pandas_udf)

产生最终结果:

  + ------- + ----------------- --- + 
| field_a | set_field_b |
+ ------- + -------------------- +
| F | [SAU,SGP,IDN,P ... |
| M | [BHR,SAU,KWT,IDN] |
+ ------- + -------------------- +

我不太喜欢pandas值/ tolist / stack / unique方法,也许有更好的方法,但是在pandas数据帧中处理列表通常并不简单。 / p>

现在,您必须将性能与explode + groupby + collect_set方法进行比较,不确定哪个会更快。告诉我们,什么时候发现!


I have a data-frame which has schema like this :

root
 |-- docId: string (nullable = true)
 |-- field_a: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- field_b: array (nullable = true)
 |    |-- element: string (containsNull = true)

I want to perform a groupBy on field_a and use collect_set to keep all the distinct values (basically inner values in the list) in the field_b in aggregation, I don't want to add a new column by exploding field_b and then do collect_set in aggregation.

How can I use udaf or pandas udf to achieve this?

E.g. :

+---------------------+----------------+------------+
|docId                |field_b         |field_a     |
+---------------------+----------------+------------+
|k&;+B8ROh\\NmetBg=DiR|[IDN,SGP]       |[F]         |
|k&;.]^nX7HRdjIO`>S1+ |[IND,KWT]       |[M]         |
|k&;h&)8Sd\\JrDVL%VH>N|[IDN,IND]       |[M]         |
|k&<8nTqjrYNE8taji^$u |[IND,BHR]       |[F]         |
|k&=$M5Hmd6Y>&@'co-^1 |[IND,AUS]       |[M]         |
|k&>pIZ)g^!L/ht!T\'/"f|[IDN,KWT]       |[M]         |
|k&@ZX>Ph%rPdZ[,Pqsc. |[IND,MYS]       |[F]         |
|k&A]C>dmDXVN$hiVEUk/ |[IND,PHL]       |[F]         |
|k&BX1eGhumSQ6`7A8<Zd |[IND,SAU]       |[M]         |
|k&J)2Vo(k*[^c"Mg*f%) |[IND,SGP]       |[F]         |
+---------------------+----------------+------------+

Output I am looking for is:

+------------+--------------------------------+
|field_a     |collect_set(field__b)           |
+------------+--------------------------------+
|[F]         |[IDN,IND,SGP,BHR,MYS,PHL]       |
|[M]         |[IND,KWT,IDN,AUS,SAU,KWT]       |
+------------+--------------------------------+

解决方案

I wrote a solution to your problem using a pandas UDF. I did not understand why your field_a column (representing gender?) was a list so I turned it into a simple string but you can make it a list of strings if you want. Here it is:

(1) Create dummy df in pandas and make a spark DataFrame:

import pandas as pd
import random
from pyspark.sql.functions import pandas_udf, PandasUDFType

a_list = ['F', 'M']
b_list = ['IDN', 'IND', 'SGP', 'BHR', 'MYS', 'PHL', 'AUS', 'SAU', 'KWT']
size = 10
dummy_df = pd.DataFrame({'docId': [random.randint(0,100) for _ in range(size)],
                         'field_b': [[random.choice(b_list), random.choice(b_list)] for _ in range(size)],
                         'field_a': [random.choice(a_list) for _ in range(size)]})

df = spark.createDataFrame(dummy_df)

producing:

+-----+-------+----------+
|docId|field_a|   field_b|
+-----+-------+----------+
|   23|      F|[SAU, SGP]|
|   36|      F|[IDN, PHL]|
|   82|      M|[BHR, SAU]|
|   30|      F|[AUS, IDN]|
|   75|      F|[AUS, MYS]|
|   46|      F|[SAU, IDN]|
|   11|      F|[SAU, BHR]|
|   71|      M|[KWT, IDN]|
|   50|      F|[IND, SGP]|
|   78|      F|[IND, SGP]|
+-----+-------+----------+

(2) Then define pandas UDF, group and apply:

@pandas_udf('field_a string, set_field_b array<string>', PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
    unique_values = pd.DataFrame(df['field_b'].values.tolist()).stack().unique().tolist()
    return pd.DataFrame({'field_a': df['field_a'].iloc[0], 'set_field_b': [unique_values]})

result = df.groupby('field_a').apply(my_pandas_udf)

yielding the final result:

+-------+--------------------+
|field_a|         set_field_b|
+-------+--------------------+
|      F|[SAU, SGP, IDN, P...|
|      M|[BHR, SAU, KWT, IDN]|
+-------+--------------------+

I don't really like the pandas values/tolist/stack/unique approach, maybe there's a better way to do it but handling lists inside pandas dataframes is generally not straightforward.

Now you have to compare the performance with the explode + groupby + collect_set approach, not sure which one will be faster. Tell us when you find out!

这篇关于在Pyspark中的groupedBy对象爆炸后使用Collect_set的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆