将运行时7.3LTS(Spark3.0.1)升级到9.1LTS(Spark3.1.2)后创建PySpark DataFrame数据库时,json文件中的重复列引发错误 [英] Duplicate column in json file throw error when creating PySpark dataframe Databricks after upgrading runtime 7.3LTS(Spark3.0.1) to 9.1LTS(Spark3.1.2)

查看:20
本文介绍了将运行时7.3LTS(Spark3.0.1)升级到9.1LTS(Spark3.1.2)后创建PySpark DataFrame数据库时,json文件中的重复列引发错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题陈述:升级数据库运行时版本时,创建数据帧时出现重复列引发错误。在较低的运行时中,将创建数据帧,并且由于下游不需要重复列,因此在SELECT中简单地将其排除。

文件位置:存储在ADLS Gen2(Azure)上的JSON文件。 集群模式:标准

代码: 我们在Azure数据库中阅读了它,如下所示。

intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
Json文件是嵌套的,其中之一下面出现了tags,这是重复的列(如下图所示)。 在读取DataFrame之后,我们选择所需的列。我们无论如何都不需要此副本tags

以前,我们在Databricks运行时7.3LTS(Spark3.0.1)上运行,它在其中创建包括重复列的数据帧,但由于我们没有进一步使用它,因此没有造成任何影响。

但是,我们现在正在升级到运行时9.1LTS(Spark3.1.2),它在创建DataFrame本身时抛出关于列重复的错误。 错误消息:Found duplicate column(s) in the data schema: `tags`

PIC复制栏:- Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)

结论: 我试着一读到DataFrame就选择列,但没有成功。 我有一种预感,因为现在升级的数据库运行时版本在默认情况下更倾向于增量表(增量表不支持其中的重复列),所以我们可能必须关闭一个属性,以便在整个笔记本中或仅在读取数据帧时忽略此检查。

虽然此完全错误发生在json上,但我相信,如果其他文件格式(如CSV)具有重复的列,则可能会发生此错误。

该文件非常嵌套,为所有必需的列定义架构并不是很实用,因为在将来需要更多列的情况下(这将是次要解决方案),这很繁琐且容易出错。文件是由供应商使用自动化过程生成的,预计所有文件都将保持与已交付的历史文件相同的格式。

运行时9.1LTS(Spark3.1.2)出现完全错误:

AnalysisException                         Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
    370             path = [path]
    371         if type(path) == list:
--> 372             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    373         elif isinstance(path, RDD):
    374             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Found duplicate column(s) in the data schema: `tags`

编辑:对预先定义架构的评论。

推荐答案

不同的好建议可能对具体情况有所帮助。

正如@ScootCork所指出的,预先定义模式会有所帮助,因为Spark不必自己创建模式。但是,我的文件非常大,而且嵌套得很重,因为手动定义模式会很麻烦。

最后我确实使用了架构,但找到了一种解决方法,因此我不必手动创建它。 即使有重复的列,我也能够在7.3 LTS运行时中创建数据帧,如原始问题中所述。因此,我读取了这个运行时上的一个文件,并将其写入ADLS Gen2(您可以将其存储在任何位置)。这是一次性的活动,现在您可以在每次运行代码时读回此文件(在读回此代码时,MULTLINE不需要为真),使用.schema获取其模式,并使用此模式读取新的json文件。因为Spark本身不必推断模式,所以它不会对重复的列抛出错误。请注意,重复列仍然存在,如果您尝试使用它,将出现ambiguous错误。但是,如果由于切变大小和复杂的JSON结构而手动定义模式不是很实用,并且如果复制的列没有用处,则此方法非常有用。说明如下:-

7.3 LTS运行时的一次性活动

# Few columns were coming as duplicate in raw file. e.g.: languages[0].groupingsets[0].element.attributes.tags[0] was repeated twice.
# This caused errror while creating dataframe.
# However, we are able to read it in Databricks Runtime 7.3 LTS. Hence used this runtime to read a file and write it to ADLS as ONE-TIME activity.
# For all further runs, this file can be read using multiline as false, then use its schema while reading the other new files (which in this case needs multiline as true). In this way spark does not have to create schema on its own hence does not throw error eben in higher runtime versions.
# Have used a historical file initially delivered which had a lot of records due to historical data. This ensures we cover all possibilities.
# Can be created again using 7.3 LTS runtime cluster if this schema is deleted. 

dfOldRuntime = spark.read.option("multiline","true").json(pathOneFile) # Can take any file to creat sample schema.
dfOldRuntime.coalesce(1).write.mode('overwrite').format('json').save(pathSchema)

现在将此写入文件用于将来的所有运行,即使在更高的运行时也是如此。

# Read sample which was created using 7.3 LTS runtime.
# The multiline does NOT have to be true for this.
# Get its schema and use it to read new files even on higher runtime without error which was caused due to duplicate columns.
dfSchema = spark.read.json(pathSchema)
schema = dfSchema.schema

# Read new json files using this schema by using `.schema()`. Works on higher runtimes as well since spark now does not have to create schema on its own.
intermediate_df = spark.read.option("multiline","true").schema(schema).json(f"{json_path}")

这篇关于将运行时7.3LTS(Spark3.0.1)升级到9.1LTS(Spark3.1.2)后创建PySpark DataFrame数据库时,json文件中的重复列引发错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆