在PySpark中的GroupedData上应用UDF(具有可运行的python示例) [英] Applying UDFs on GroupedData in PySpark (with functioning python example)

查看:133
本文介绍了在PySpark中的GroupedData上应用UDF(具有可运行的python示例)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有在python数据帧中本地运行的以下python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

我想在PySpark中运行它,但是在处理pyspark.sql.group.GroupedData对象时遇到了麻烦.

我尝试了以下操作:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 

返回

KeyError: 'A'

我想是因为'A'不再是一列,而且我找不到x.name的等效项.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()

但出现以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'

任何建议将不胜感激!

解决方案

您要尝试编写的是UDAF(用户定义的聚合函数),而不是UDF(用户定义的函数). UDAF是对按键分组的数据起作用的功能.具体来说,他们需要定义如何在单个分区中合并组中的多个值,然后如何在跨分区的键中合并结果. python中目前没有实现UDAF的方法,只能在Scala中实现.

但是,您可以在Python中解决它.您可以使用收集集来收集分组的值,然后使用常规的UDF对它们执行所需的操作.唯一需要注意的是collect_set仅适用于原始值,因此您需要将它们编码为字符串.

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

如果要重复数据删除,请使用collect_set.另外,如果某些键的值很多,这将很慢,因为键的所有值都需要收集在群集中某个位置的单个分区中.如果最终结果是通过以某种方式组合每个键的值(例如求和)而构建的值,则使用此处更好的解释或查看以下 user6910411 的答案作为示例.

I have this python code that runs locally in a pandas dataframe:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

I would like to run this in PySpark, but having trouble dealing with pyspark.sql.group.GroupedData object.

I've tried the following:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 

which returns

KeyError: 'A'

I presume because 'A' is no longer a column and I can't find the equivalent for x.name.

And then

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()

but get the following error:

AttributeError: 'GroupedData' object has no attribute 'map'

Any suggestions would be really appreciated!

解决方案

What you are trying to is write a UDAF (User Defined Aggregate Function) as opposed to a UDF (User Defined Function). UDAFs are functions that work on data grouped by a key. Specifically they need to define how to merge multiple values in the group in a single partition, and then how to merge the results across partitions for key. There is currently no way in python to implement a UDAF, they can only be implemented in Scala.

But, you can work around it in Python. You can use collect set to gather your grouped values and then use a regular UDF to do what you want with them. The only caveat is collect_set only works on primitive values, so you will need to encode them down to a string.

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

Use collect_set if you want deduping. Also, if you have lots of values for some of your keys, this will be slow because all values for a key will need to be collected in a single partition somewhere on your cluster. If your end result is a value you build by combining the values per key in some way (for example summing them) it might be faster to implement it using the RDD aggregateByKey method which lets you build an intermediate value for each key in a partition before shuffling data around.

EDIT: 11/21/2018

Since this answer was written, pyspark added support for UDAF'S using Pandas. There are some nice performance improvements when using the Panda's UDFs and UDAFs over straight python functions with RDDs. Under the hood it vectorizes the columns (batches the values from multiple rows together to optimize processing and compression). Take a look at here for a better explanation or look at user6910411's answer below for an example.

这篇关于在PySpark中的GroupedData上应用UDF(具有可运行的python示例)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆