TypeError: Column is not iterable - 如何迭代 ArrayType()? [英] TypeError: Column is not iterable - How to iterate over ArrayType()?

查看:264
本文介绍了TypeError: Column is not iterable - 如何迭代 ArrayType()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下数据帧:

+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物|[胡须,流浪者,fido]|+------+------------------------------+

可以使用以下代码创建:

import pyspark.sql.functions as f数据 = [('person', ['john', 'sam', 'jane']),('pet', ['whiskers', 'rover', 'fido'])]df = sqlCtx.createDataFrame(data, ["type", "names"])df.show(截断=假)

有没有办法直接修改ArrayType()"names",通过对每个元素应用一个函数,而不使用udf>?

例如,假设我想将函数 foo 应用到 "names" 列.(我将使用 foostr.upper 的示例仅用于说明目的,但我的问题是关于可应用于可迭代元素的任何有效函数.)

foo = lambda x: x.upper() # 以str.upper为例df.withColumn('X', [foo(x) for x in f.col("names")]).show()

<块引用>

类型错误:列不可迭代

我可以使用 udf 做到这一点:

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)#+------+------------------------------+#|类型|名称|#+------+------------------------------+#|人|[约翰、山姆、简] |#|宠物|[胡须、罗孚、FIDO]|#+------+------------------------------+

在这个特定的例子中,我可以通过分解列来避免udf,调用pyspark.sql.functions.upper(),然后 groupBycollect_list:

df.select('type', f.explode('names').alias('name'))\.withColumn('name', f.upper(f.col('name')))\.groupBy('类型')\.agg(f.collect_list('name').alias('names'))\.show(截断=假)#+------+------------------------------+#|类型|名称|#+------+------------------------------+#|人|[约翰、山姆、简] |#|宠物|[胡须、罗孚、FIDO]|#+------+------------------------------+

但是这是很多代码来做一些简单的事情.是否有更直接的方法来使用 spark-dataframe 函数迭代 ArrayType() 的元素?

解决方案

In Spark <2.4您可以使用用户定义的函数:

from pyspark.sql.functions import udf从 pyspark.sql.types 导入 ArrayType、DataType、StringTypedef 变换(f, t=StringType()):如果不是 isinstance(t, DataType):raise TypeError("Invalid type {}".format(type(t)))@udf(ArrayType(t))定义_(xs):如果 xs 不是 None:返回 [f(x) for x in xs]返回 _foo_udf = 变换(str.upper)df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物 |[胡须、罗孚、FIDO]|+------+------------------------------+

考虑到 的高成本explode + collect_list 习惯用法,尽管有内在成本,但这种方法几乎是首选.

Spark 2.4 或更高版本中,您可以使用 转换* 使用 upper(见 SPARK-23909a>):

from pyspark.sql.functions import exprdf.withColumn('names', expr('transform(names, x -> upper(x))')).show(截断=假)

+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物|[胡须、罗孚、FIDO]|+------+------------------------------+

也可以使用 pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFTypedef transform_pandas(f, t=StringType()):如果不是 isinstance(t, DataType):raise TypeError("Invalid type {}".format(type(t)))@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)定义_(xs):return xs.apply(lambda xs: [f(x) for x in xs] 如果 xs 不是 None else xs)返回 _foo_udf_pandas = transform_pandas(str.upper)df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+|类型 |名称 |+------+------------------------------+|人|[约翰、山姆、简] ||宠物 |[胡须、罗孚、FIDO]|+------+------------------------------+

尽管只有最新的 Arrow/PySpark 组合支持处理 ArrayType 列 (SPARK-24259SPARK-21187).尽管如此,在支持任意 Python 函数的同时,此选项应该比标准 UDF 更有效(尤其是具有更低的 serde 开销).

<小时>

* 还支持许多其他高阶函数,包括但不限于filteraggregate.参见示例

Consider the following DataFrame:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

Which can be created with the following code:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)

Is there a way to directly modify the ArrayType() column "names" by applying a function to each element, without using a udf?

For example, suppose I wanted to apply the function foo to the "names" column. (I will use the example where foo is str.upper just for illustrative purposes, but my question is regarding any valid function that can be applied to the elements of an iterable.)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()

TypeError: Column is not iterable

I could do this using a udf:

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

In this specific example, I could avoid the udf by exploding the column, call pyspark.sql.functions.upper(), and then groupBy and collect_list:

df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

But this is a lot of code to do something simple. Is there is a more direct way to iterate over the elements of an ArrayType() using spark-dataframe functions?

解决方案

In Spark < 2.4 you can use an user defined function:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType

def transform(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @udf(ArrayType(t))
    def _(xs):
        if xs is not None:
            return [f(x) for x in xs]
    return _

foo_udf = transform(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

Considering high cost of explode + collect_list idiom, this approach is almost exclusively preferred, despite its intrinsic cost.

In Spark 2.4 or later you can use transform* with upper (see SPARK-23909):

from pyspark.sql.functions import expr

df.withColumn(
    'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

It is also possible to use pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType

def transform_pandas(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
    return _

foo_udf_pandas = transform_pandas(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

although only the latest Arrow / PySpark combinations support handling ArrayType columns (SPARK-24259, SPARK-21187). Nonetheless this option should be more efficient than standard UDF (especially with a lower serde overhead) while supporting arbitrary Python functions.


* A number of other higher order functions are also supported, including, but not limited to filter and aggregate. See for example

这篇关于TypeError: Column is not iterable - 如何迭代 ArrayType()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆