TypeError:列不可迭代-如何遍历ArrayType()? [英] TypeError: Column is not iterable - How to iterate over ArrayType()?
问题描述
请考虑以下DataFrame:
Consider the following DataFrame:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
可以使用以下代码创建
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
是否可以通过对每个元素应用函数而不使用udf
来直接修改ArrayType()
列"names"
?
Is there a way to directly modify the ArrayType()
column "names"
by applying a function to each element, without using a udf
?
例如,假设我想将功能foo
应用于"names"
列. (我将使用foo
是str.upper
的示例,仅出于说明目的,但我的问题是关于可以应用于可迭代元素的任何有效函数.)
For example, suppose I wanted to apply the function foo
to the "names"
column. (I will use the example where foo
is str.upper
just for illustrative purposes, but my question is regarding any valid function that can be applied to the elements of an iterable.)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
TypeError:列不可迭代
TypeError: Column is not iterable
我可以使用udf
:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
在此特定示例中,我可以通过展开列,依次调用pyspark.sql.functions.upper()
和groupBy
和collect_list
来避免udf
:
In this specific example, I could avoid the udf
by exploding the column, call pyspark.sql.functions.upper()
, and then groupBy
and collect_list
:
df.select('type', f.explode('names').alias('name'))\
.withColumn('name', f.upper(f.col('name')))\
.groupBy('type')\
.agg(f.collect_list('name').alias('names'))\
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
但这是很多代码,可以使事情变得简单.是否有更直接的方法使用spark-dataframe函数遍历ArrayType()
的元素?
But this is a lot of code to do something simple. Is there is a more direct way to iterate over the elements of an ArrayType()
using spark-dataframe functions?
推荐答案
在 Spark< 2.4 ,您可以使用用户定义的功能:
In Spark < 2.4 you can use an user defined function:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@udf(ArrayType(t))
def _(xs):
if xs is not None:
return [f(x) for x in xs]
return _
foo_udf = transform(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
考虑到的高昂费用explode
+ collect_list
习惯用法,尽管有其固有的成本,但几乎总是首选这种方法.
Considering high cost of explode
+ collect_list
idiom, this approach is almost exclusively preferred, despite its intrinsic cost.
在 Spark 2.4 或更高版本中,您可以使用 upper
(请参见 SPARK-23909 ):
In Spark 2.4 or later you can use transform
* with upper
(see SPARK-23909):
from pyspark.sql.functions import expr
df.withColumn(
'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
也可以使用pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
def _(xs):
return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
尽管只有最新的Arrow/PySpark组合支持处理ArrayType
列( SPARK-24259 , SPARK-21187 ).尽管如此,在支持任意Python函数的同时,此选项应比标准UDF(尤其是Serde开销较低)更有效.
although only the latest Arrow / PySpark combinations support handling ArrayType
columns (SPARK-24259, SPARK-21187). Nonetheless this option should be more efficient than standard UDF (especially with a lower serde overhead) while supporting arbitrary Python functions.
* 还支持许多其他高阶函数,包括但不限于 filter
和 aggregate
.参见示例
* A number of other higher order functions are also supported, including, but not limited to filter
and aggregate
. See for example
- Querying Spark SQL DataFrame with complex types
- How to slice and sum elements of array column?
- Filter array column content
- Spark Scala row-wise average by handling null.
- How to use transform higher-order function?.
这篇关于TypeError:列不可迭代-如何遍历ArrayType()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!