PySpark尝试将上一个字段的架构应​​用于下一个字段 [英] PySpark trying to apply previous field's schema to next field

查看:70
本文介绍了PySpark尝试将上一个字段的架构应​​用于下一个字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

PySpark存在这个奇怪的问题.在处理过程中,似乎正在尝试将上一个字段的模式应用于下一个字段.

Having this weird issue with PySpark. It seems to be trying to apply the schema for the previous field, to the next field, as it's processing.

我可以想出的最简单的测试用例:

Simplest test case I could come up with:

%pyspark
from pyspark.sql.types import (
    DateType,
    StructType,
    StructField,
    StringType,
)

from datetime import date
from pyspark.sql import Row


schema = StructType(
    [
        StructField("date", DateType(), True),
        StructField("country", StringType(), True),
    ]
)

test = spark.createDataFrame(
    [
        Row(
            date=date(2019, 1, 1),
            country="RU",
        ),
    ],
    schema
)

Stacktrace:

Stacktrace:

Fail to execute line 26:     schema
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-8579306903394369208.py", line 380, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 26, in <module>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 691, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 423, in _createFromLocal
    data = [schema.toInternal(row) for row in data]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in toInternal
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in <genexpr>
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 439, in toInternal
    return self.dataType.toInternal(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 175, in toInternal
    return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'str' object has no attribute 'toordinal'

从本地运行而不是在Zepplin中运行的奖励信息:

Bonus information from running it locally rather than in Zepplin:

self = DateType, d = 'RU'

    def toInternal(self, d):
        if d is not None:
>           return d.toordinal() - self.EPOCH_ORDINAL
E           AttributeError: 'str' object has no attribute 'toordinal'

例如,它正在尝试将DateType应用于country.如果我摆脱了date,就可以了.如果我摆脱了country,就可以了.两者在一起,是行不通的.

e.g., it's trying to apply DateType to country. If I get rid of date, it's fine. If I get rid of country, it's fine. Both together, is a no go.

有什么想法吗?我缺少明显的东西吗?

Any ideas? Am I missing something obvious?

推荐答案

如果要使用Row的列表,则也不需要指定架构.这是因为Row已经知道架构.

If you're going to use a list of Rows, you don't need to specify the schema as well. This is because the Row already knows the schema.

之所以出现此问题,是因为 pyspark.sql.Row 对象不会保持您为字段指定的顺序.

The problem is happening because the pyspark.sql.Row object does not maintain the order that you specified for the fields.

print(Row(date=date(2019, 1, 1), country="RU"))
#Row(country='RU', date=datetime.date(2019, 1, 1))

来自文档:

行可用于通过使用命名参数来创建行对象,这些字段将按名称排序.

Row can be used to create a row object by using named arguments, the fields will be sorted by names.

如您所见,country字段放在第一位.当spark尝试使用指定的schema创建DataFrame时,它期望第一项是DateType.

As you can see, the country field is being put first. When spark tries to create the DataFrame with the specified schema, it expects the first item to be a DateType.

解决此问题的一种方法是将字段按字母顺序放在schema中:

One way to fix this is to put the fields in your schema in alphabetical order:

schema = StructType(
    [
        StructField("country", StringType(), True),
        StructField("date", DateType(), True)
    ]
)

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ],
    schema
)
test.show()
#+-------+----------+
#|country|      date|
#+-------+----------+
#|     RU|2019-01-01|
#+-------+----------+

或者在这种情况下,甚至不需要将schema传递给createDataFrame.可以从Row s推断出来:

Or in this case, there's no need to even pass in the schema to createDataFrame. It will be inferred from the Rows:

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ]
)

如果要重新排列列,请使用select:

And if you wanted to reorder the columns, use select:

test = test.select("date", "country")
test.show()
#+----------+-------+
#|      date|country|
#+----------+-------+
#|2019-01-01|     RU|
#+----------+-------+

这篇关于PySpark尝试将上一个字段的架构应​​用于下一个字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆