TypeError:列不可迭代-如何遍历ArrayType()? [英] TypeError: Column is not iterable - How to iterate over ArrayType()?

查看:214
本文介绍了TypeError:列不可迭代-如何遍历ArrayType()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请考虑以下DataFrame:

Consider the following DataFrame:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

可以使用以下代码创建

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)

是否可以通过对每个元素应用函数而不使用udf来直接修改ArrayType()"names"?

Is there a way to directly modify the ArrayType() column "names" by applying a function to each element, without using a udf?

例如,假设我想将功能foo应用于"names"列. (我将使用foostr.upper的示例,仅出于说明目的,但我的问题是关于可以应用于可迭代元素的任何有效函数.)

For example, suppose I wanted to apply the function foo to the "names" column. (I will use the example where foo is str.upper just for illustrative purposes, but my question is regarding any valid function that can be applied to the elements of an iterable.)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()

TypeError:列不可迭代

TypeError: Column is not iterable

我可以使用udf:

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

在此特定示例中,我可以通过展开列,依次调用pyspark.sql.functions.upper()groupBycollect_list来避免udf:

In this specific example, I could avoid the udf by exploding the column, call pyspark.sql.functions.upper(), and then groupBy and collect_list:

df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

但这是很多代码,可以使事情变得简单.是否有更直接的方法使用spark-dataframe函数遍历ArrayType()的元素?

But this is a lot of code to do something simple. Is there is a more direct way to iterate over the elements of an ArrayType() using spark-dataframe functions?

推荐答案

Spark< 2.4 ,您可以使用用户定义的功能:

In Spark < 2.4 you can use an user defined function:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType

def transform(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @udf(ArrayType(t))
    def _(xs):
        if xs is not None:
            return [f(x) for x in xs]
    return _

foo_udf = transform(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

考虑到的高昂费用explode + collect_list 习惯用法,尽管有其固有的成本,但几乎总是首选这种方法.

Considering high cost of explode + collect_list idiom, this approach is almost exclusively preferred, despite its intrinsic cost.

Spark 2.4 或更高版本中,您可以使用 upper (请参见 SPARK-23909 ):

In Spark 2.4 or later you can use transform* with upper (see SPARK-23909):

from pyspark.sql.functions import expr

df.withColumn(
    'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

也可以使用pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType

def transform_pandas(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
    return _

foo_udf_pandas = transform_pandas(str.upper)

df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+

尽管只有最新的Arrow/PySpark组合支持处理ArrayType列( SPARK-24259 SPARK-21187 ).尽管如此,在支持任意Python函数的同时,此选项应比标准UDF(尤其是Serde开销较低)更有效.

although only the latest Arrow / PySpark combinations support handling ArrayType columns (SPARK-24259, SPARK-21187). Nonetheless this option should be more efficient than standard UDF (especially with a lower serde overhead) while supporting arbitrary Python functions.

* 还支持许多其他高阶函数,包括但不限于 filter aggregate .参见示例

* A number of other higher order functions are also supported, including, but not limited to filter and aggregate. See for example

  • Querying Spark SQL DataFrame with complex types
  • How to slice and sum elements of array column?
  • Filter array column content
  • Spark Scala row-wise average by handling null.
  • How to use transform higher-order function?.

这篇关于TypeError:列不可迭代-如何遍历ArrayType()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆