TypeError: Column is not iterable - 如何迭代 ArrayType()? [英] TypeError: Column is not iterable - How to iterate over ArrayType()?
问题描述
考虑以下数据帧:
+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物|[胡须,流浪者,fido]|+------+------------------------------+
可以使用以下代码创建:
import pyspark.sql.functions as f数据 = [('person', ['john', 'sam', 'jane']),('pet', ['whiskers', 'rover', 'fido'])]df = sqlCtx.createDataFrame(data, ["type", "names"])df.show(截断=假)
有没有办法直接修改ArrayType()
列"names"
,通过对每个元素应用一个函数,而不使用udf
>?
例如,假设我想将函数 foo
应用到 "names"
列.(我将使用 foo
是 str.upper
的示例仅用于说明目的,但我的问题是关于可应用于可迭代元素的任何有效函数.)
foo = lambda x: x.upper() # 以str.upper为例df.withColumn('X', [foo(x) for x in f.col("names")]).show()
<块引用>
类型错误:列不可迭代
我可以使用 udf
做到这一点:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)#+------+------------------------------+#|类型|名称|#+------+------------------------------+#|人|[约翰、山姆、简] |#|宠物|[胡须、罗孚、FIDO]|#+------+------------------------------+
在这个特定的例子中,我可以通过分解列来避免udf
,调用pyspark.sql.functions.upper()
,然后 groupBy
和 collect_list
:
df.select('type', f.explode('names').alias('name'))\.withColumn('name', f.upper(f.col('name')))\.groupBy('类型')\.agg(f.collect_list('name').alias('names'))\.show(截断=假)#+------+------------------------------+#|类型|名称|#+------+------------------------------+#|人|[约翰、山姆、简] |#|宠物|[胡须、罗孚、FIDO]|#+------+------------------------------+
但是这是很多代码来做一些简单的事情.是否有更直接的方法来使用 spark-dataframe 函数迭代 ArrayType()
的元素?
In Spark <2.4您可以使用用户定义的函数:
from pyspark.sql.functions import udf从 pyspark.sql.types 导入 ArrayType、DataType、StringTypedef 变换(f, t=StringType()):如果不是 isinstance(t, DataType):raise TypeError("Invalid type {}".format(type(t)))@udf(ArrayType(t))定义_(xs):如果 xs 不是 None:返回 [f(x) for x in xs]返回 _foo_udf = 变换(str.upper)df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物 |[胡须、罗孚、FIDO]|+------+------------------------------+
考虑到 的高成本explode
+ collect_list
习惯用法,尽管有内在成本,但这种方法几乎是首选.
在 Spark 2.4 或更高版本中,您可以使用 转换
* 使用 upper
(见 SPARK-23909a>):
from pyspark.sql.functions import exprdf.withColumn('names', expr('transform(names, x -> upper(x))')).show(截断=假)
+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物|[胡须、罗孚、FIDO]|+------+------------------------------+
也可以使用 pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFTypedef transform_pandas(f, t=StringType()):如果不是 isinstance(t, DataType):raise TypeError("Invalid type {}".format(type(t)))@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)定义_(xs):return xs.apply(lambda xs: [f(x) for x in xs] 如果 xs 不是 None else xs)返回 _foo_udf_pandas = transform_pandas(str.upper)df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物 |[胡须、罗孚、FIDO]|+------+------------------------------+
尽管只有最新的 Arrow/PySpark 组合支持处理 ArrayType
列 (SPARK-24259、SPARK-21187).尽管如此,在支持任意 Python 函数的同时,此选项应该比标准 UDF 更有效(尤其是具有更低的 serde 开销).
* 还支持许多其他高阶函数,包括但不限于filter
和 aggregate
.参见示例
Consider the following DataFrame:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
Which can be created with the following code:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Is there a way to directly modify the ArrayType()
column "names"
by applying a function to each element, without using a udf
?
For example, suppose I wanted to apply the function foo
to the "names"
column. (I will use the example where foo
is str.upper
just for illustrative purposes, but my question is regarding any valid function that can be applied to the elements of an iterable.)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
TypeError: Column is not iterable
I could do this using a udf
:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
In this specific example, I could avoid the udf
by exploding the column, call pyspark.sql.functions.upper()
, and then groupBy
and collect_list
:
df.select('type', f.explode('names').alias('name'))\
.withColumn('name', f.upper(f.col('name')))\
.groupBy('type')\
.agg(f.collect_list('name').alias('names'))\
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
But this is a lot of code to do something simple. Is there is a more direct way to iterate over the elements of an ArrayType()
using spark-dataframe functions?
In Spark < 2.4 you can use an user defined function:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@udf(ArrayType(t))
def _(xs):
if xs is not None:
return [f(x) for x in xs]
return _
foo_udf = transform(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
Considering high cost of explode
+ collect_list
idiom, this approach is almost exclusively preferred, despite its intrinsic cost.
In Spark 2.4 or later you can use transform
* with upper
(see SPARK-23909):
from pyspark.sql.functions import expr
df.withColumn(
'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
It is also possible to use pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
def _(xs):
return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
although only the latest Arrow / PySpark combinations support handling ArrayType
columns (SPARK-24259, SPARK-21187). Nonetheless this option should be more efficient than standard UDF (especially with a lower serde overhead) while supporting arbitrary Python functions.
* A number of other higher order functions are also supported, including, but not limited to filter
and aggregate
. See for example
- Querying Spark SQL DataFrame with complex types
- How to slice and sum elements of array column?
- Filter array column content
- Spark Scala row-wise average by handling null.
- How to use transform higher-order function?.
这篇关于TypeError: Column is not iterable - 如何迭代 ArrayType()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!