将 Spark 数据帧写为带有分区的 CSV [英] Write Spark dataframe as CSV with partitions

查看:41
本文介绍了将 Spark 数据帧写为带有分区的 CSV的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 Spark 中的数据帧写入 HDFS 位置,我希望如果我添加 partitionBy 符号 Spark 将创建分区(类似于 Parquet 格式的书写)

I'm trying to write a dataframe in spark to an HDFS location and I expect that if I'm adding the partitionBy notation Spark will create partition (similar to writing in Parquet format) folder in form of

partition_column_name=partition_value

(即 partition_date=2016-05-03).为此,我运行了以下命令:

( i.e partition_date=2016-05-03). To do so, I ran the following command :

(df.write
    .partitionBy('partition_date')
    .mode('overwrite')
    .format("com.databricks.spark.csv")
    .save('/tmp/af_organic'))

但尚未创建分区文件夹知道我该怎么做才能让 spark DF 自动创建这些文件夹吗?

but partition folders had not been created any idea what sould I do in order for spark DF automatically create those folders?

谢谢,

推荐答案

Spark 2.0.0+:

内置的 csv 格式支持开箱即用的分区,因此您应该能够简单地使用:

Built-in csv format supports partitioning out of the box so you should be able to simply use:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)

不包括任何额外的包.

火花<2.0.0:

目前 (v1.4.0) spark-csv 不支持 partitionBy(参见 databricks/spark-csv#123) 但您可以调整内置源来实现您想要的.

At this moment (v1.4.0) spark-csv doesn't support partitionBy (see databricks/spark-csv#123) but you can adjust built-in sources to achieve what you want.

您可以尝试两种不同的方法.假设您的数据相对简单(没有复杂的字符串并且需要进行字符转义)并且看起来或多或少是这样的:

You can try two different approaches. Assuming your data is relatively simple (no complex strings and need for character escaping) and looks more or less like this:

df = sc.parallelize([
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])

您可以手动准备写入值:

You can manually prepare values for writing:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)

并使用text源代码

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
    .options(inferSchema="true")
    .load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)

在更复杂的情况下,您可以尝试使用适当的 CSV 解析器以类似的方式预处理值,通过使用 UDF 或通过 RDD 映射,但成本会高得多.

In more complex cases you can try to use proper CSV parser to preprocess values in a similar way, either by using UDF or mapping over RDD, but it will be significantly more expensive.

如果 CSV 格式不是硬性要求,您还可以使用支持 partitionBy 开箱即用的 JSON 编写器:

If CSV format is not a hard requirement you can also use JSON writer which supports partitionBy out-of-the-box:

df.write.partitionBy("k").json("/tmp/bar")

以及读取时的分区发现.

as well as partition discovery on read.

这篇关于将 Spark 数据帧写为带有分区的 CSV的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆