PySpark 输出文件数 [英] PySpark Number of Output Files

查看:53
本文介绍了PySpark 输出文件数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 新手.我有一个简单的 pyspark 脚本.它读取一个 json 文件,将其展平并将其作为 parquet 压缩文件写入 S3 位置.

读取和转换步骤运行得非常快,并使用了 50 个执行程序(我在 conf 中设置).但写入阶段耗时较长,只写入一个大文件(480MB).

保存的文件数量是如何决定的?写操作可以以某种方式加速吗?

谢谢,拉姆.

解决方案

输出的文件数等于正在保存的RDD的分区数.在这个示例中,RDD 被重新分区以控制输出文件的数量.

试试:

<块引用>

repartition(numPartitions) - 重新整理 RDD 中的数据随机创建更多或更少的分区并在它们之间进行平衡.这总是对网络上的所有数据进行混洗.

<预><代码>>>>dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test")

<小时>

输出的文件数与RDD的partitiond数相同.

$ hadoop fs -ls/user/cloudera/sqoop_import/orders_test找到 3 个项目-rw-r--r-- 1 cloudera cloudera 0 2016-12-28 12:52/user/cloudera/sqoop_import/orders_test/_SUCCESS-rw-r--r-- 1 cloudera cloudera 1499519 2016-12-28 12:52/user/cloudera/sqoop_import/orders_test/part-00000-rw-r--r-- 1 cloudera cloudera 1500425 2016-12-28 12:52/user/cloudera/sqoop_import/orders_test/part-00001

另请检查:coalesce(numPartitions)

source-1|source-2

<小时>

更新:

<块引用>

textFile 方法 还需要一个可选的第二个参数控制文件的分区数.默认情况下,Spark为文件的每个块创建一个分区(块为 64MBHDFS 中的默认值),但您也可以要求更高数量的通过传递更大的值来分区.请注意,您不能拥有更少的分区而不是块.

...但这是可能的最小分区数,因此不能保证.

所以如果你想在读取时分区,你应该使用这个......

dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2)

I am a Spark Newbie. I have a simple pyspark script. It reads a json file, flattens it and writes it to S3 location as parquet compressed file.

The read and transformation steps run very fast and uses 50 executors (which I set in the conf). But the write stage takes a long time and writes only one large file (480MB).

How is the number of files saved decided? Can the write operation be sped up somehow?

Thanks, Ram.

解决方案

The number of files output is equal to the the number of partitions of the RDD being saved. In this sample, the RDD is repartitioned to control the number of output files.

Try:

repartition(numPartitions) - Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test")


The number of files output is the same as the number of partitionds of the RDD.

$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test
Found 3 items
-rw-r--r--   1 cloudera cloudera          0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS
-rw-r--r--   1 cloudera cloudera    1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000
-rw-r--r--   1 cloudera cloudera    1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001

Also check this: coalesce(numPartitions)

source-1 | source-2


Update:

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

... but this is minimum number of possible partitions so they are not guaranteed.

so if you want to partition on read, you should use this....

dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2)

这篇关于PySpark 输出文件数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆