使用PySpark将嵌套的JSON解析为Spark DataFrame [英] Parsing Nested JSON into a Spark DataFrame Using PySpark

查看:399
本文介绍了使用PySpark将嵌套的JSON解析为Spark DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我真的很喜欢使用PySpark-SQL解析嵌套JSON数据的帮助。数据具有以下模式(空格是出于保密目的而进行的编辑...)

I would really love some help with parsing nested JSON data using PySpark-SQL. The data has the following schema (blank spaces are edits for confidentiality purposes...)

模式

root
 |-- location_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant_type: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |-- other_data: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- other_data_1 string (nullable = true)
 |    |    |    |    |-- other_data_2: string (nullable = true)
 |    |    |    |    |-- other_data_3: string (nullable = true)
 |    |    |    |    |-- other_data_4: string (nullable = true)
 |    |    |    |    |-- other_data_5: string (nullable = true)
 |    |    |
 |    |    |-- latitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- longitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- timezone: string (nullable = true)
 |-- restaurant_id: string (nullable = true)

我的目标
我基本上想将数据放入以下数据框

My Goal I would essentially want to get the data into the following data frame

restaurant_id | latitude | longtitude | timezone 

我尝试过

以下代码

dfj = spark.read.option("multiLine", False).json("/file/path")

result = dfj.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )

# SQL operation
result.createOrReplaceTempView('result')

subset_data = spark.sql(
'''
SELECT restaurant_id, location_info.latitude,location_info.longitude,location_info.timestamp  
FROM result

'''
).show()  

# Also tried this to read in
source_df_1 = spark.read.json(sc.wholeTextFiles("/file/path")
          .values()
          .flatMap(lambda x: x
                   .replace("{", "#!#")
                   .split("#!#")))

但奇怪的是,它给了我以下内容仅适用于第一个对象或餐厅ID

But oddly enough it gives me the following only for the first object or restaurant id

+-------+-----------+------------+--------------------+
|restaurant_id|latitude|longitude|timestamp|
+-------+-----------+------------+--------------------+
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
+-------+-----------+------------+--------------------+

我的研究表明这可能有些与在源头构造JSON文件的方式有关。例如:

My research indicated that this may have something to do with the way JSON files are structured at the source. For example:

{}{
}{
}

因此不会出现多行或其他情况。

Thereby not being multi-Line or something. Wondering what to do about this as well?

非常感谢您阅读,任何帮助将不胜感激。我知道我总是可以依靠SO来提供帮助的

Thank you very much for reading, any help would really be appreciated. I know I can always count on SO to be helpful

推荐答案

spark.read.json() 读者假设每行文本有一个json对象。我不确定我是否跟随 \n 的插入,然后进行分割...听起来像文件格式错误?

The spark.read.json() reader assumes one json object per text line. I'm not sure I follow the insertion of the \n and then the split... sounds like maybe the file is malformed?

也许有一个记录分隔符,例如\r,您看不到。 linux命令 od -c< file name> | head -10 将有助于显示记录之间的字符。

Perhaps there is a record separator such as a \r which you can't see. The linux command od -c <file name> |head -10 will help show what the characters are in between records.

如果该模式是众所周知的,则提供该模式对象,这将减少进行模式推断的第一遍。例如。
schema.read.schema(schema).json(目录路径),绝对可以使您的读取操作更快。将对象另存为镶木地板或三角洲湖格式,以获得更好的性能,您需要稍后查询。

If the schema is well known, then supply that schema object, this will reduce the first pass which does schema inferencing. E.g. schema.read.schema(schema).json('path to directory') and definitely make your read operation much faster. Save the objects as parquet or delta lake format for better performance you need to query it later.

Databricks的 COPY INTO cloudFiles 格式将加快速度摄取/减少延迟。 https://docs.databricks.com/spark/latest/结构化流/auto-loader.html

Databricks' COPY INTO or cloudFiles format will speed the ingestion/reduce the latency. https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html

这篇关于使用PySpark将嵌套的JSON解析为Spark DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆