将Spark数据帧写入JSON数组(pyspark) [英] write spark dataframe as array of json (pyspark)
问题描述
我想将我的spark数据帧编写为一组JSON文件,尤其是每个文件,均以JSON数组的形式编写.让我用一个简单的(可重现的)代码进行解释.
I would like to write my spark dataframe as a set of JSON files and in particular each of which as an array of JSON. Let's me explain with a simple (reproducible) code.
我们有:
import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))
将数据框另存为:
df.write.json('s3://path/to/json')
刚创建的每个文件每行都有一个JSON对象,如下所示:
each file just created has one JSON object per line, something like:
{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}
但我希望每个文件有一个JSON数组 :
but I would like to have an array of those JSON per file:
[
{"x":0.9953802385540144,"y":0.476027611419198},
{"x":0.929599290575914,"y":0.72878523939521},
{"x":0.951701684432855,"y":0.8008064729546504}
]
推荐答案
当前无法让spark以所需的格式本地"写入单个文件,因为spark以分布式(并行)方式工作,每个执行者独立地写入数据的一部分.
It is not currently possible to have spark "natively" write a single file in your desired format because spark works in a distributed (parallel) fashion, with each executor writing its part of the data independently.
但是,由于您可以将每个文件设为数组的json不仅是[一个]文件,这是一种可用于实现所需输出的解决方法:
However, since you are okay with having each file be an array of json not only [one] file, here is one workaround that you can use to achieve your desired output:
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
df.select(to_json(struct(*df.columns)).alias("json"))\
.groupBy(spark_partition_id())\
.agg(collect_list("json").alias("json_list"))\
.select(col("json_list").cast("string"))\
.write.text("s3://path/to/json")
首先,您需要在 df
中的所有列中创建一个 json
.然后按Spark分区ID分组,并使用 collect_list
进行汇总.这会将该分区上的所有 json
放入列表中.由于您是在分区内进行聚合,因此无需进行任何数据整理.
First you create a json
from all of the columns in df
. Then group by the spark partition ID and aggregate using collect_list
. This will put all the json
s on that partition into a list. Since you're aggregating within the partition, there should be no shuffling of data required.
现在选择列表列,转换为字符串,然后将其写为文本文件.
Now select the list column, convert to a string, and write it as a text file.
下面是一个文件外观的示例:
Here's an example of how one file looks:
[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]
请注意,您可能会得到一些空文件.
Note you may get some empty files.
如果您指定了一个空的 groupBy
,大概可以强制spark将数据写入一个文件,但这将导致将所有数据强制进入一个分区,从而导致数据丢失.内存错误.
Presumably you can force spark to write the data in ONE file if you specified an empty groupBy
, but this would result in forcing all of the data into a single partition which could result in an out of memory error.
这篇关于将Spark数据帧写入JSON数组(pyspark)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!