将Spark DataFrame数据分割成单独的文件 [英] Divide Spark DataFrame data into separate files
问题描述
我有一个s3文件的DataFrame输入,需要将数据转换成以下所需的输出。我在Scala中使用Spark 1.5.1版本,但是可以用Python改为Spark。欢迎任何建议。
I have the following DataFrame input from a s3 file and need to transform the data into the following desired output. I am using Spark version 1.5.1 with Scala, but could change to Spark with Python. Any suggestions are welcome.
DataFrame输入:
DataFrame Input:
name animal data
john mouse aaaaa
bob mouse bbbbb
bob mouse ccccc
bob dog ddddd
期望的输出:
john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv
terminal$ cat bob/mouse/file.csv
bbbbb
ccccc
terminal$ cat bob/dog/file.csv
ddddd
这是我现有的Spark Scala代码我尝试过:
Here is my existing Spark Scala code that I have tried:
val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)
当前输出:
[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]
我现有代码的一些问题是groupBy返回一个GroupedData对象,我可能不想对该数据执行count / sum / agg函数。我正在寻找一种更好的技术来分组和输出数据。数据集非常大。
Some of the problems with my existing code is that the groupBy returns a GroupedData object and I probably don't want to do a count/sum/agg function on that data. I am looking for a better technique to group and output the data. The dataset is very large.
推荐答案
可以使用 partitionBy
$ c> DataFrameWriter 。一般语法如下:
This can be achieved using partitionBy
option of the DataFrameWriter
. General syntax is as follows:
df.write.partitionBy("name", "animal").format(...).save(...)
不幸的是,在Spark 1.5中支持分区的唯一纯文本格式是JSON。
Unfortunately the only plain text format which support partitioning in Spark 1.5 is JSON.
如果您可以将Spark安装更新为:
If you can update Spark installation to:
- 1.6 - 你可以使用
partitionBy
与文本
格式。如果您需要单个输出文件(repartition
),则还需要1.6。 - 2.0 - 您可以使用
partitionBy
与csv
格式。
- 1.6 - you can use
partitionBy
withtext
format. 1.6 is also required if you need a single output file for group (repartition
). - 2.0 - you can use
partitionBy
withcsv
format.
我相信1.5中最好的选择是将文件写成JSON并转换单个输出文件。
I believe that in 1.5 your best option is to write files as JSON and convert individual output files.
如果数量不同的名称','动物
很小,您可以尝试为每个组执行单独写入:
If number of distinct name', 'animals
is small you can try to perform separate write for each group:
val dist = df.select("name", "animal").rdd.collect.map {
case Row(name: String, animal: String) => (name, animal)
}
for {
(name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
.select($"data").write.format("csv").save(s"/prefix/$name/$animal")
但是,如果组合数量增加,则不会缩放。
but this won't scale when number of combinations grows.
这篇关于将Spark DataFrame数据分割成单独的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!