PySpark 尝试将上一个字段的架构应​​用于下一个字段 [英] PySpark trying to apply previous field's schema to next field

查看:33
本文介绍了PySpark 尝试将上一个字段的架构应​​用于下一个字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

PySpark 有这个奇怪的问题.它似乎正在尝试将前一个字段的架构应​​用于下一个字段,因为它正在处理.

Having this weird issue with PySpark. It seems to be trying to apply the schema for the previous field, to the next field, as it's processing.

我能想到的最简单的测试用例:

Simplest test case I could come up with:

%pyspark
from pyspark.sql.types import (
    DateType,
    StructType,
    StructField,
    StringType,
)

from datetime import date
from pyspark.sql import Row


schema = StructType(
    [
        StructField("date", DateType(), True),
        StructField("country", StringType(), True),
    ]
)

test = spark.createDataFrame(
    [
        Row(
            date=date(2019, 1, 1),
            country="RU",
        ),
    ],
    schema
)

堆栈跟踪:

Fail to execute line 26:     schema
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-8579306903394369208.py", line 380, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 26, in <module>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 691, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 423, in _createFromLocal
    data = [schema.toInternal(row) for row in data]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in toInternal
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in <genexpr>
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 439, in toInternal
    return self.dataType.toInternal(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 175, in toInternal
    return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'str' object has no attribute 'toordinal'

在本地而不是在 Zepplin 中运行它的奖励信息:

Bonus information from running it locally rather than in Zepplin:

self = DateType, d = 'RU'

    def toInternal(self, d):
        if d is not None:
>           return d.toordinal() - self.EPOCH_ORDINAL
E           AttributeError: 'str' object has no attribute 'toordinal'

例如,它试图将 DateType 应用于 country.如果我去掉 date,那就没问题了.如果我摆脱了country,那就没问题了.两者在一起,是不行的.

e.g., it's trying to apply DateType to country. If I get rid of date, it's fine. If I get rid of country, it's fine. Both together, is a no go.

有什么想法吗?我是否遗漏了一些明显的东西?

Any ideas? Am I missing something obvious?

推荐答案

如果您打算使用 Row 的列表,您也不需要指定架构.这是因为 Row 已经知道架构.

If you're going to use a list of Rows, you don't need to specify the schema as well. This is because the Row already knows the schema.

问题的发生是因为 pyspark.sql.Row 对象不维护您为字段指定的顺序.

The problem is happening because the pyspark.sql.Row object does not maintain the order that you specified for the fields.

print(Row(date=date(2019, 1, 1), country="RU"))
#Row(country='RU', date=datetime.date(2019, 1, 1))

来自 文档:

Row 可用于通过命名参数创建行对象,字段将按名称排序.

Row can be used to create a row object by using named arguments, the fields will be sorted by names.

如您所见,country 字段被放在首位.当 spark 尝试使用指定的 schema 创建 DataFrame 时,它​​期望第一项是 DateType.

As you can see, the country field is being put first. When spark tries to create the DataFrame with the specified schema, it expects the first item to be a DateType.

解决此问题的一种方法是按字母顺序将字段放在 schema 中:

One way to fix this is to put the fields in your schema in alphabetical order:

schema = StructType(
    [
        StructField("country", StringType(), True),
        StructField("date", DateType(), True)
    ]
)

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ],
    schema
)
test.show()
#+-------+----------+
#|country|      date|
#+-------+----------+
#|     RU|2019-01-01|
#+-------+----------+

或者在这种情况下,甚至不需要将 schema 传递给 createDataFrame.它将从 Rows:

Or in this case, there's no need to even pass in the schema to createDataFrame. It will be inferred from the Rows:

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ]
)

如果您想对列重新排序,请使用 select:

And if you wanted to reorder the columns, use select:

test = test.select("date", "country")
test.show()
#+----------+-------+
#|      date|country|
#+----------+-------+
#|2019-01-01|     RU|
#+----------+-------+

这篇关于PySpark 尝试将上一个字段的架构应​​用于下一个字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆