将Spark DataFrame数据分割成单独的文件 [英] Divide Spark DataFrame data into separate files

查看:2925
本文介绍了将Spark DataFrame数据分割成单独的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个s3文件的DataFrame输入,需要将数据转换成以下所需的输出。我在Scala中使用Spark 1.5.1版本,但是可以用Python改为Spark。欢迎任何建议。

I have the following DataFrame input from a s3 file and need to transform the data into the following desired output. I am using Spark version 1.5.1 with Scala, but could change to Spark with Python. Any suggestions are welcome.

DataFrame输入:

DataFrame Input:

name    animal   data
john    mouse    aaaaa
bob     mouse    bbbbb
bob     mouse    ccccc
bob     dog      ddddd

期望的输出:

john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv

terminal$ cat bob/mouse/file.csv
bbbbb
ccccc

terminal$ cat bob/dog/file.csv
ddddd

这是我现有的Spark Scala代码我尝试过:

Here is my existing Spark Scala code that I have tried:

val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)

当前输出:

[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]

我现有代码的一些问题是groupBy返回一个GroupedData对象,我可能不想对该数据执行count / sum / agg函数。我正在寻找一种更好的技术来分组和输出数据。数据集非常大。

Some of the problems with my existing code is that the groupBy returns a GroupedData object and I probably don't want to do a count/sum/agg function on that data. I am looking for a better technique to group and output the data. The dataset is very large.

推荐答案

可以使用 partitionBy $ c> DataFrameWriter 。一般语法如下:

This can be achieved using partitionBy option of the DataFrameWriter. General syntax is as follows:

df.write.partitionBy("name", "animal").format(...).save(...)

不幸的是,在Spark 1.5中支持分区的唯一纯文本格式是JSON。

Unfortunately the only plain text format which support partitioning in Spark 1.5 is JSON.

如果您可以将Spark安装更新为:

If you can update Spark installation to:


  • 1.6 - 你可以使用 partitionBy 文本格式。如果您需要单个输出文件( repartition ),则还需要1.6。

  • 2.0 - 您可以使用 partitionBy csv 格式。

  • 1.6 - you can use partitionBy with text format. 1.6 is also required if you need a single output file for group (repartition).
  • 2.0 - you can use partitionBy with csv format.

我相信1.5中最好的选择是将文件写成JSON并转换单个输出文件。

I believe that in 1.5 your best option is to write files as JSON and convert individual output files.

如果数量不同的名称','动物很小,您可以尝试为每个组执行单独写入:

If number of distinct name', 'animals is small you can try to perform separate write for each group:

val dist = df.select("name", "animal").rdd.collect.map {
  case Row(name: String, animal: String) => (name, animal)
}

for {
  (name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
    .select($"data").write.format("csv").save(s"/prefix/$name/$animal")

但是,如果组合数量增加,则不会缩放。

but this won't scale when number of combinations grows.

这篇关于将Spark DataFrame数据分割成单独的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆