Spark DataFrame序列化为无效的json [英] Spark DataFrame serialized as invalid json
问题描述
TL; DR :当我将Spark DataFrame
作为json转储时,我总是以类似
TL;DR: When I dump a Spark DataFrame
as json, I always end up with something like
{"key1": "v11", "key2": "v21"}
{"key1": "v12", "key2": "v22"}
{"key1": "v13", "key2": "v23"}
这是无效的json.我可以手动编辑转储的文件以获取可以解析的内容:
which is invalid json. I can manually edit the dumped file to get something I can parse:
[
{"key1": "v11", "key2": "v21"},
{"key1": "v12", "key2": "v22"},
{"key1": "v13", "key2": "v23"}
]
但是我很确定我缺少使我避免手动编辑的内容.我只是现在不知道什么.
but I'm pretty sure I'm missing something that would let me avoid this manual edit. I just don't now what.
更多详细信息:
我有一个 org.apache.spark.sql.DataFrame
,我尝试使用以下代码将其转储到json:
I have a org.apache.spark.sql.DataFrame
and I try dumping it to json using the following code:
myDataFrame.write.json("file.json")
我也尝试过:
myDataFrame.toJSON.saveAsTextFile("file.json")
在两种情况下,它最终都正确地转储了每一行,但是缺少行之间以及方括号之间的逗号分隔.因此,当我随后尝试解析该文件时,我使用的解析器侮辱了我,然后失败了.
In both case it ends up dumping correctly each row, but it's missing a separating comma between the rows, and as well as square brackets. Consequently, when I subsequently try to parse this file the parser I use insults me and then fails.
I would be grateful to learn how I can dump valid json. (reading the documentation of the DataFrameWriter didn't provided me with any interesting hints.)
推荐答案
这是预期的输出.Spark使用类似于JSON行的格式有多种原因:
This is an expected output. Spark uses JSON Lines-like format for a number of reasons:
- 它可以并行解析和加载.
- 可以在不将完整文件加载到内存中的情况下进行解析.
- 它可以并行编写.
- 可以编写它而无需在内存中存储完整的分区.
- 即使文件为空也是有效的输入.
- 最后一个
Row
是Spark中的结构,它映射到JSON对象而不是数组. - ...
- It can parsed and loaded in parallel.
- Parsing can be done without loading full file in memory.
- It can be written in parallel.
- It can be written without storing complete partition in memory.
- Is valid input even if file is empty.
- Finally
Row
in Spark is struct which maps to JSON object not array. - ...
您可以通过几种方式创建所需的输出,但是它始终会与上述其中一种冲突.
You can create desired output in a few ways, but it will always conflict with one of the above.
例如,您可以为每个分区编写一个JSON 文档:
You can for example write a single JSON document for each partition:
import org.apache.spark.sql.functions._
df
.groupBy(spark_partition_id)
.agg(collect_list(struct(df.columns map col: _*)).alias("data"))
.select($"data")
.write
.json(output_path)
您可以在它前面加上 repartition(1)
以获得单个输出文件,但这不是您想要做的,除非数据很小.
You could prepend this with repartition(1)
to get a single output file, but it is not something you want to do, unless data is very small.
1.6替代品将是glom
1.6 alternative would be glom
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val newSchema = StructType(Seq(StructField("data", ArrayType(df.schema))))
sqlContext.createDataFrame(
df.rdd.glom.flatMap(a => if(a.isEmpty) Seq() else Seq(Row(a))),
newSchema
)
这篇关于Spark DataFrame序列化为无效的json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!