Spark DataFrame 序列化为无效的 json [英] Spark DataFrame serialized as invalid json

查看:34
本文介绍了Spark DataFrame 序列化为无效的 json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TL;DR:当我将 Spark DataFrame 转储为 json 时,我总是得到类似

{"key1": "v11", "key2": "v21"}{key1":v12",key2":v22"}{key1":v13",key2":v23"}

这是无效的 json.我可以手动编辑转储的文件以获得我可以解析的内容:

<预><代码>[{key1":v11",key2":v21"},{key1":v12",key2":v22"},{key1":v13",key2":v23"}]

但我很确定我遗漏了一些可以让我避免手动编辑的东西.我只是现在不知道什么.

更多详情:

我有一个 org.apache.spark.sql.DataFrame,我尝试使用以下代码将其转储到 json:

myDataFrame.write.json("file.json")

我也试过:

myDataFrame.toJSON.saveAsTextFile("file.json")

在这两种情况下,它最终都会正确转储每一行,但在行之间缺少分隔逗号以及方括号.因此,当我随后尝试解析这个文件时,我使用的解析器侮辱了我,然后失败了.

如果我能了解如何转储有效的 json,我将不胜感激.(阅读 DataFrameWriter 没有给我任何有趣的提示.)

解决方案

这是预期的输出.Spark 使用 JSON Lines 之类的格式有多种原因:

  • 它可以并行解析和加载.
  • 无需在内存中加载完整文件即可完成解析.
  • 它可以并行编写.
  • 无需在内存中存储完整分区即可写入.
  • 即使文件为空也是有效的输入.
  • 最后,Spark 中的 Row 是映射到 JSON 对象而不是数组的结构体.
  • ...

您可以通过多种方式创建所需的输出,但它总会与上述一种方式相冲突.

例如,您可以为每个分区编写一个 JSON 文档:

import org.apache.spark.sql.functions._df.groupBy(spark_partition_id).agg(collect_list(struct(df.columns map col:_*)).alias("data")).select($"数据").写.json(输出路径)

您可以在前面加上 repartition(1) 以获得单个输出文件,但这不是您想要做的事情,除非数据非常小.

1.6 的替代方案是 glom

import org.apache.spark.sql.Row导入 org.apache.spark.sql.types._val newSchema = StructType(Seq(StructField("data", ArrayType(df.schema))))sqlContext.createDataFrame(df.rdd.glom.flatMap(a => if(a.isEmpty) Seq() else Seq(Row(a))),新架构)

TL;DR: When I dump a Spark DataFrame as json, I always end up with something like

{"key1": "v11", "key2": "v21"}
{"key1": "v12", "key2": "v22"}
{"key1": "v13", "key2": "v23"}

which is invalid json. I can manually edit the dumped file to get something I can parse:

[
  {"key1": "v11", "key2": "v21"},
  {"key1": "v12", "key2": "v22"},
  {"key1": "v13", "key2": "v23"}
]

but I'm pretty sure I'm missing something that would let me avoid this manual edit. I just don't now what.

More details:

I have a org.apache.spark.sql.DataFrame and I try dumping it to json using the following code:

myDataFrame.write.json("file.json")

I also tried with:

myDataFrame.toJSON.saveAsTextFile("file.json")

In both case it ends up dumping correctly each row, but it's missing a separating comma between the rows, and as well as square brackets. Consequently, when I subsequently try to parse this file the parser I use insults me and then fails.

I would be grateful to learn how I can dump valid json. (reading the documentation of the DataFrameWriter didn't provided me with any interesting hints.)

解决方案

This is an expected output. Spark uses JSON Lines-like format for a number of reasons:

  • It can parsed and loaded in parallel.
  • Parsing can be done without loading full file in memory.
  • It can be written in parallel.
  • It can be written without storing complete partition in memory.
  • Is valid input even if file is empty.
  • Finally Row in Spark is struct which maps to JSON object not array.
  • ...

You can create desired output in a few ways, but it will always conflict with one of the above.

You can for example write a single JSON document for each partition:

import org.apache.spark.sql.functions._

df
  .groupBy(spark_partition_id)
  .agg(collect_list(struct(df.columns map col: _*)).alias("data"))
  .select($"data")
  .write
  .json(output_path)

You could prepend this with repartition(1) to get a single output file, but it is not something you want to do, unless data is very small.

1.6 alternative would be glom

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val newSchema = StructType(Seq(StructField("data", ArrayType(df.schema))))

sqlContext.createDataFrame(
  df.rdd.glom.flatMap(a => if(a.isEmpty) Seq() else Seq(Row(a))), 
  newSchema
)

这篇关于Spark DataFrame 序列化为无效的 json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆