根据特定列对Spark Dataframe进行分区,并将每个分区的内容转储到CSV上 [英] Partition a Spark Dataframe based on a specific column and dump the content of each partition on a csv

查看:703
本文介绍了根据特定列对Spark Dataframe进行分区,并将每个分区的内容转储到CSV上的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark 1.6.2 Java API将数据加载到Dataframe DF1中,如下所示:

I'm using spark 1.6.2 Java APIs to load some data in a Dataframe DF1 that looks like:

Key  Value
A    v1
A    v2
B    v3
A    v4

现在,我需要基于键"列中的值子集对DF1进行分区,并将每个分区转储到csv文件中(使用spark-csv).

Now I need to partition DF1 based on a subset of value in column "Key" and dump each partition to a csv file (using spark-csv).

所需的输出:

A.csv

Key Value
A   v1
A   v2
A   v4

B.csv

Key Value
B   v3

目前,我正在做的是构建一个HashMap(myList),其中包含我需要过滤的值的子集,然后在每次迭代中迭代该过滤不同的Key.通过以下代码,我得到了想要的东西,但我想知道是否有更有效的方法可以做到这一点:

At the moment what I'm doing is building an HashMap (myList) containing the subset of values that i need to filter and then iterate through that filtering a different Key each iteration. With the following code I get what I want but I'm wondering if there is a more efficient way to do that:

DF1 = <some operations>.cache();

for (Object filterKey: myList.keySet()) {
  DF2 = DF1.filter((String)myList.get(filterKey));

  DF2.write().format.format("com.databricks.spark.csv")
            .option("header", "true")
      .save("/" + filterKey + ".csv");
}

推荐答案

您快到了,只需添加partitionBy,它将以您想要的方式对文件进行分区.

You are almost there, you just need to add the partitionBy, which will partition the files in the way you want.

DF1
  .filter{case(key, value) => myList.contains(key))
  .write
  .partitionBy("key")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/my/basepath/")

文件现在将存储在"/my/basepath/key = A/","/my/basepath/key = B/"等下.

The files will now be stored under "/my/basepath/key=A/", "/my/basepath/key=B/", etc..

这篇关于根据特定列对Spark Dataframe进行分区,并将每个分区的内容转储到CSV上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆