PySpark 中 pandas_udf 的隐式模式? [英] Implicit schema for pandas_udf in PySpark?

查看:40
本文介绍了PySpark 中 pandas_udf 的隐式模式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个答案很好地解释了如何使用 pyspark 的 groupby 和 pandas_udf 进行自定义聚合.但是,我无法手动声明我的架构,如示例的这一部分所示

This answer nicely explains how to use pyspark's groupby and pandas_udf to do custom aggregations. However, I cannot possibly declare my schema manually as shown in this part of the example

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

因为我将返回 100 多个带有自动生成名称的列.有什么方法可以告诉 PySpark 只是隐含地使用我的函数返回的 Schema 并假设它对所有工作节点都相同?这个模式也会在运行期间发生变化,因为我将不得不使用我想要使用的预测器,所以模式生成的自动化过程可能是一个选择......

since I will be returning 100+ columns with names that are automatically generated. Is there any way to tell PySpark to just implicetely use the Schema returned by my function and assume it's going to be the same for all worker nodes? This schema will also change during runs since I will have to play around with the predictors I want to use, so an automated process for Schema generation might be an option...

推荐答案

基于 Sanxofons 评论, 我有一个关于如何自己实施的想法:

Based on Sanxofons comment, I got an idea on how to implement this myself:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

我所做的是获取一个示例 pandas df,将其传递给函数,然后查看返回的内容:

What I do is get a sample pandas df, pass it to the function, and see what returns:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

这似乎对我有用.问题在于它是一种递归(需要定义函数来获取架构,将架构定义为 udf).我通过创建一个简单地传递数据帧的包装器"UDF 解决了这个问题.

This seems to work for me. The problem is that it is kind of recursive (need to define the function to get the schema, have the schema to define as udf). I solved this by creating a "wrapper" UDF that simply passes the dataframe.

这篇关于PySpark 中 pandas_udf 的隐式模式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆