将spark数据帧写为带分区的CSV [英] write spark dataframe as CSV with partitions

查看:1553
本文介绍了将spark数据帧写为带分区的CSV的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在spark中写一个数据帧到hdfs位置,我希望如果我添加了'partitionBy'记号spark将创建分区
(类似于写镶木地板格式)
文件夹,形式为partition_column_name = partition_value
(即 partition_date = 2016-05-03 )。
这样做,我运行以下命令:



df.write.partitionBy('partition_date')。mode ').format(com.databricks.spark.csv)。save('/ tmp / af_organic')





感谢,

div class =h2_lin>解决方案

Spark 2.0.0 +

内置csv格式支持分区出来,所以你应该能够简单地使用:

  df.write.partitionBy('partition_date')。 (mode).format(csv)。save(path)

附加软件包



2.0.0



此时(v1.4.0) spark-csv 支持 partitionBy (请参阅 databricks / spark- csv#123 ),但您可以调整内置来源以实现您想要的。



您可以尝试两种不同的方法。假设你的数据相对简单(没有复杂的字符串,需要字符转义),看起来或多或少像这样:

  df = sc.parallelize([
(foo,1,2.0,4.0),(bar,-1,3.5,-0.1)
])toDF([k,x1 ,x2,x3])

p>

  from pyspark.sql.functions import col,concat_ws 

key = col(k)
values = concat_ws(,,* [df.columns [1:]中的x的col(x)])

kvs = df.select(key,values)

并使用文字 b

  kvs.write.partitionBy(k)text(/ tmp / foo)

df_foo =(sqlContext。 read.format(com.databricks.spark.csv)
.options(inferSchema =true)
.load(/ tmp / foo / k = foo))

df_foo.printSchema()
## root
## | - C0:integer(nullable = true)
## | - C1:double(nullable = true)
## | - C2:double(nullable = true)

您可以尝试使用正确的CSV解析器以类似的方式预处理值,通过使用UDF或通过RDD映射,但它会显着更昂贵。



如果CSV格式不是一个硬要求,你也可以使用支持 partitionBy 开箱即用的JSON编写器:

  df.write.partitionBy(k)。json(/ tmp / bar)

以及读取时的分区发现。


I'm trying to write a dataframe in spark to an hdfs location and I expect that if I'm adding the 'partitionBy' notation spark will create partition (similar to writing in parquet format) folder in form of "partition_column_name=partition_value" ( i.e partition_date=2016-05-03). to do so, I ran the following command :

df.write.partitionBy('partition_date').mode('overwrite').format("com.databricks.spark.csv").save('/tmp/af_organic')

but partition folders had not been created any idea what sould I do in order for spark DF automatically create those folders?

Thanks,

解决方案

Spark 2.0.0+:

Built-in csv format supports partitioning out of the box so you should be able to simply use:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)

without including any additional packages.

Spark < 2.0.0:

At this moment (v1.4.0) spark-csv doesn't support partitionBy (see databricks/spark-csv#123) but you can adjust built-in sources to achieve what you want.

You can try two different approaches. Assuming your data is relatively simple (no complex strings and need for character escaping) and looks more or less like this:

df = sc.parallelize([
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])

You can manually prepare values for writing:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)

and write using text source

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
    .options(inferSchema="true")
    .load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)

In more complex cases you can try to use proper CSV parser to preprocess values in a similar way, either by using UDF or mapping over RDD, but it will be significantly more expensive.

If CSV format is not a hard requirement you can also use JSON writer which supports partitionBy out-of-the-box:

df.write.partitionBy("k").json("/tmp/bar")

as well as partition discovery on read.

这篇关于将spark数据帧写为带分区的CSV的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆