如何在PySpark中读取大型JSON数组文件 [英] How do I read a Large JSON Array File in PySpark

查看:17
本文介绍了如何在PySpark中读取大型JSON数组文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

我最近在Azure Data Lake Analytics遇到了一个挑战,当时我试图读入一个大型的UTF-8 JSON数组文件,并切换到HDInsight PySpark(v2.x,而不是3)来处理该文件。该文件大小约为110G,具有约150M个JSON对象。

HDInsight PySpark似乎不支持数组的JSON文件格式的输入,所以我被卡住了。另外,我还有"许多"这样的文件,每个文件都有不同的模式,每个文件都包含数百列,所以现在不能为这些文件创建模式。

问题

如何在HDInsight上使用PySpark 2中的开箱即用功能以使这些文件能够被读取为JSON?

谢谢,

J

我尝试的内容

我使用了此页面底部的方法: from Databricks提供了以下代码片段:

import json

df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF()
display(df)

我尝试了上述操作,但不了解"WalleTextFiles"的工作原理,当然也遇到了OutOfMemory错误,这些错误很快杀死了我的执行者。

我尝试加载到RDD和其他打开方法,但PySpark似乎只支持JSONLines JSON文件格式,并且由于Adla对该文件格式的要求,我有JSON对象数组。

我尝试将其作为文本文件读入,剥离数组字符,拆分JSON对象边界并转换为JSON,但始终出现无法转换Unicode和/或str(Ings)的错误。

我找到了解决上述问题的方法,并将其转换为一个数据帧,该数据帧包含一个列,其中包含JSON对象的多行字符串行。然而,我没有找到一种仅将数据框行中的JSON字符串单独输出到输出文件的方法。结果总是

{'dfColumnName':'{...json_string_as_value}'}

我还尝试了一个map函数,它接受上面的行,解析为JSON,提取值(我想要的JSON),然后将值解析为JSON。这似乎是可行的,但当我尝试保存时,RDD是PipelineRDD类型,并且没有saveAsTextFile()方法。然后,我尝试了toJSON方法,但不断收到关于"未找到有效的JSON对象"的错误,当然,我也不理解这些错误,当然还有其他转换错误。

推荐答案

我终于找到了前进的方向。我了解到我可以直接从RDD中读取JSON,包括一个管道RDD。我找到了一种方法来删除Unicode字节序头,将数组方括号括起来,根据幸运的分隔符拆分JSON对象,并拥有分布式数据集以实现更高效的处理。输出数据帧现在具有以JSON元素命名的列、推断架构并动态适应其他文件格式。

以下是代码-希望它能有所帮助!:

#...Spark considers arrays of Json objects to be an invalid format
#    and unicode files are prefixed with a byteorder marker
#
thanksMoiraRDD = sc.textFile( '/a/valid/file/path', partitions ).map(
    lambda x: x.encode('utf-8','ignore').strip(u",
[]ufeff") 
)

df = sqlContext.read.json(thanksMoiraRDD)

这篇关于如何在PySpark中读取大型JSON数组文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆