Pyspark:pandas_udf、grouped_agg 的多个参数 [英] Pyspark: multiple parameters for pandas_udf, grouped_agg
本文介绍了Pyspark:pandas_udf、grouped_agg 的多个参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试应用带有两个参数的 pandas_udf.但我有这个错误.首先,我尝试使用一个参数,没问题:
I am trying to apply a pandas_udf, with two parameters. But I've got this error. First I try with one parameter and it's ok:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.config('spark.cores.max', 100) \
.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
这是数据的样子
+---+----+
| id| v|
+---+----+
| 1| 1.0|
| 1| 2.0|
| 2| 3.0|
| 2| 5.0|
| 2|10.0|
+---+----+
我的 pandas_udf 函数是
My pandas_udf function is
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf(v):
cond = v<=3
res = v[cond].count()
return res
df.groupby("id").agg(count_udf(df['v'])).show()
结果是
+---+------------+
| id|count_udf(v)|
+---+------------+
| 1| 2.0|
| 2| 1.0|
+---+------------+
但是当我尝试为 pandas_udf 函数输入两个参数时,出现错误.
But when I try to put two parameters for the pandas_udf function as follow, I have an error.
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf2(v, value):
cond = v<=value
res = v[cond].count()
return res
df.groupby("id").agg(count_udf(df['v'],4)).show()
错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 3267, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-18-468499490a1f>", line 1, in <module>
res = df.groupby("id").agg(count_udf(df['v'],4))
File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/udf.py", line 189, in wrapper
return self(*args)
File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/udf.py", line 169, in __call__
return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 65, in _to_seq
cols = [converter(c) for c in cols]
File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 65, in <listcomp>
cols = [converter(c) for c in cols]
File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 53, in _to_java_column
"function.".format(col, type(col)))
TypeError: Invalid argument, not a string or column: 4 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
推荐答案
您可以在与调用函数相同的范围内定义一个 pandas_udf 函数.所以所有局部变量都将在其中可见.
You can define a pandas_udf function in the same scope with a calling function. So all local variables will be visible in it.
例如:
def wrapper_count_udf():
value = 4
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf(v):
cond = v<=value
res = v[cond].count()
return res
df.groupby("id").agg(count_udf(df['v'])).show()
这篇关于Pyspark:pandas_udf、grouped_agg 的多个参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文