将Spark数据帧写入JSON数组(pyspark) [英] write spark dataframe as array of json (pyspark)

查看:83
本文介绍了将Spark数据帧写入JSON数组(pyspark)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将我的spark数据帧编写为一组JSON文件,尤其是每个文件,均以JSON数组的形式编写.让我用一个简单的(可重现的)代码进行解释.

I would like to write my spark dataframe as a set of JSON files and in particular each of which as an array of JSON. Let's me explain with a simple (reproducible) code.

我们有:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

将数据框另存为:

df.write.json('s3://path/to/json')

刚创建的每个文件每行都有一个JSON对象,如下所示:

each file just created has one JSON object per line, something like:

{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

但我希望每个文件有一个JSON数组 :

but I would like to have an array of those JSON per file:

[
   {"x":0.9953802385540144,"y":0.476027611419198},
   {"x":0.929599290575914,"y":0.72878523939521},
   {"x":0.951701684432855,"y":0.8008064729546504}
]

推荐答案

当前无法让spark以所需的格式本地"写入单个文件,因为spark以分布式(并行)方式工作,每个执行者独立地写入数据的一部分.

It is not currently possible to have spark "natively" write a single file in your desired format because spark works in a distributed (parallel) fashion, with each executor writing its part of the data independently.

但是,由于您可以将每个文件设为数组的json不仅是[一个]文件,这是一种可用于实现所需输出的解决方法:

However, since you are okay with having each file be an array of json not only [one] file, here is one workaround that you can use to achieve your desired output:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("json_list"))\
    .select(col("json_list").cast("string"))\
    .write.text("s3://path/to/json")

首先,您需要在 df 中的所有列中创建一个 json .然后按Spark分区ID分组,并使用 collect_list 进行汇总.这会将该分区上的所有 json 放入列表中.由于您是在分区内进行聚合,因此无需进行任何数据整理.

First you create a json from all of the columns in df. Then group by the spark partition ID and aggregate using collect_list. This will put all the jsons on that partition into a list. Since you're aggregating within the partition, there should be no shuffling of data required.

现在选择列表列,转换为字符串,然后将其写为文本文件.

Now select the list column, convert to a string, and write it as a text file.

下面是一个文件外观的示例:

Here's an example of how one file looks:

[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

请注意,您可能会得到一些空文件.

Note you may get some empty files.

如果您指定了一个空的 groupBy ,大概可以强制spark将数据写入一个文件,但这将导致将所有数据强制进入一个分区,从而导致数据丢失.内存错误.

Presumably you can force spark to write the data in ONE file if you specified an empty groupBy, but this would result in forcing all of the data into a single partition which could result in an out of memory error.

这篇关于将Spark数据帧写入JSON数组(pyspark)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆