将运行时7.3LTS(Spark3.0.1)升级到9.1LTS(Spark3.1.2)后创建PySpark DataFrame数据库时,json文件中的重复列引发错误 [英] Duplicate column in json file throw error when creating PySpark dataframe Databricks after upgrading runtime 7.3LTS(Spark3.0.1) to 9.1LTS(Spark3.1.2)
问题描述
代码: 我们在Azure数据库中阅读了它,如下所示。
intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
Json文件是嵌套的,其中之一下面出现了tags
,这是重复的列(如下图所示)。
在读取DataFrame之后,我们选择所需的列。我们无论如何都不需要此副本tags
。
以前,我们在Databricks运行时7.3LTS(Spark3.0.1)上运行,它在其中创建包括重复列的数据帧,但由于我们没有进一步使用它,因此没有造成任何影响。
但是,我们现在正在升级到运行时9.1LTS(Spark3.1.2),它在创建DataFrame本身时抛出关于列重复的错误。 错误消息:Found duplicate column(s) in the data schema: `tags`
PIC复制栏:-
Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)
结论: 我试着一读到DataFrame就选择列,但没有成功。 我有一种预感,因为现在升级的数据库运行时版本在默认情况下更倾向于增量表(增量表不支持其中的重复列),所以我们可能必须关闭一个属性,以便在整个笔记本中或仅在读取数据帧时忽略此检查。
虽然此完全错误发生在json上,但我相信,如果其他文件格式(如CSV)具有重复的列,则可能会发生此错误。
该文件非常嵌套,为所有必需的列定义架构并不是很实用,因为在将来需要更多列的情况下(这将是次要解决方案),这很繁琐且容易出错。文件是由供应商使用自动化过程生成的,预计所有文件都将保持与已交付的历史文件相同的格式。
运行时9.1LTS(Spark3.1.2)出现完全错误:
AnalysisException Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
370 path = [path]
371 if type(path) == list:
--> 372 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
373 elif isinstance(path, RDD):
374 def func(iterator):
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
AnalysisException: Found duplicate column(s) in the data schema: `tags`
编辑:对预先定义架构的评论。
推荐答案
不同的好建议可能对具体情况有所帮助。
正如@ScootCork所指出的,预先定义模式会有所帮助,因为Spark不必自己创建模式。但是,我的文件非常大,而且嵌套得很重,因为手动定义模式会很麻烦。
最后我确实使用了架构,但找到了一种解决方法,因此我不必手动创建它。
即使有重复的列,我也能够在7.3 LTS运行时中创建数据帧,如原始问题中所述。因此,我读取了这个运行时上的一个文件,并将其写入ADLS Gen2(您可以将其存储在任何位置)。这是一次性的活动,现在您可以在每次运行代码时读回此文件(在读回此代码时,MULTLINE不需要为真),使用.schema
获取其模式,并使用此模式读取新的json文件。因为Spark本身不必推断模式,所以它不会对重复的列抛出错误。请注意,重复列仍然存在,如果您尝试使用它,将出现ambiguous
错误。但是,如果由于切变大小和复杂的JSON结构而手动定义模式不是很实用,并且如果复制的列没有用处,则此方法非常有用。说明如下:-
7.3 LTS运行时的一次性活动
# Few columns were coming as duplicate in raw file. e.g.: languages[0].groupingsets[0].element.attributes.tags[0] was repeated twice.
# This caused errror while creating dataframe.
# However, we are able to read it in Databricks Runtime 7.3 LTS. Hence used this runtime to read a file and write it to ADLS as ONE-TIME activity.
# For all further runs, this file can be read using multiline as false, then use its schema while reading the other new files (which in this case needs multiline as true). In this way spark does not have to create schema on its own hence does not throw error eben in higher runtime versions.
# Have used a historical file initially delivered which had a lot of records due to historical data. This ensures we cover all possibilities.
# Can be created again using 7.3 LTS runtime cluster if this schema is deleted.
dfOldRuntime = spark.read.option("multiline","true").json(pathOneFile) # Can take any file to creat sample schema.
dfOldRuntime.coalesce(1).write.mode('overwrite').format('json').save(pathSchema)
现在将此写入文件用于将来的所有运行,即使在更高的运行时也是如此。
# Read sample which was created using 7.3 LTS runtime.
# The multiline does NOT have to be true for this.
# Get its schema and use it to read new files even on higher runtime without error which was caused due to duplicate columns.
dfSchema = spark.read.json(pathSchema)
schema = dfSchema.schema
# Read new json files using this schema by using `.schema()`. Works on higher runtimes as well since spark now does not have to create schema on its own.
intermediate_df = spark.read.option("multiline","true").schema(schema).json(f"{json_path}")
这篇关于将运行时7.3LTS(Spark3.0.1)升级到9.1LTS(Spark3.1.2)后创建PySpark DataFrame数据库时,json文件中的重复列引发错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!