为什么Spark saveAsTable with bucketBy创建数千个文件? [英] Why is Spark saveAsTable with bucketBy creating thousands of files?
问题描述
上下文
Spark 2.0.1,以集群模式提供spark-submit。我正在阅读hdfs的parquet文件:
val spark = SparkSession.builder
.appName(myApp)
.config(hive.metastore.uris,thrift://XXX.XXX.net:9083)
.config(spark.sql.sources.bucketing.enabled,true)
.enableHiveSupport()
.getOrCreate()
val df = spark.read
.format(parquet)
.load(hdfs ://XXX.XX.X.XX/myParquetFile)
我将 df
添加到由50个桶组成的配置单元表,由 userid
:
df0.write
.bucketBy(50,userid)
.saveAsTable(myHiveTable)
现在,当我在hdfs / user / hive / warehouse
中查看配置单元仓库时,有一个文件夹命名为 myHiveTable
。里面有一堆部分 - *。parquet
文件。我预计会有50个文件。但是,不,有 3201 文件!每个分区有64个文件,为什么?对于保存为配置表格的不同文件,每个分区有不同数量的文件。所有文件都很小,每个文件只有几十Kb!
让我补充一点,不同的 userid
在 myParquetFile
中大约 1 000 000
。
问题
为什么文件夹中有3201个文件而不是50个!他们是什么?
当我将此表读回DataFrame并打印分区数时:
val df2 = spark.sql(SELECT * FROM myHiveTable)
println(df2.rdd.getNumPartitions)
分区数量是50,我确认数据正确分区为 userid
。
对于我的一个大型数据集3Tb,我创建了一个包含1000个分区的表,它创建了几百万个文件!超过了目录项限制1048576,并给出 org.apache.hadoop.hdfs.protocol.FSLimitException $ MaxDirectoryItemsExceededException
问题
创建的文件数量取决于什么?
问题
有没有限制创建文件数量的方法?
问题
我应该担心这些文件吗?所有这些文件是否会损害 df2
的性能?总是说我们不应该创建太多的分区,因为它是有问题的。
问题
我发现此信息
希望这会有所帮助。
Ravi
Context
Spark 2.0.1, spark-submit in cluster mode. I am reading a parquet file from hdfs:
val spark = SparkSession.builder
.appName("myApp")
.config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
.config("spark.sql.sources.bucketing.enabled", true)
.enableHiveSupport()
.getOrCreate()
val df = spark.read
.format("parquet")
.load("hdfs://XXX.XX.X.XX/myParquetFile")
I am saving the df
to a hive table with 50 buckets grouped by userid
:
df0.write
.bucketBy(50, "userid")
.saveAsTable("myHiveTable")
Now, when I look into the hive warehouse at my hdfs /user/hive/warehouse
there is a folder named myHiveTable
. Inside it are a bunch of part-*.parquet
files. I would expect there to be 50 files. But no, there are 3201 files!!!! There are 64 files per partition, why? There are different number of files per partitions for different files I saved as hive table. All the files are very small, just tens of Kb each!
Let me add, that number of different userid
is about 1 000 000
in myParquetFile
.
Question
Why are there 3201 files in the folder instead of 50! What are they?
When I read this table back into DataFrame and print number of partitions:
val df2 = spark.sql("SELECT * FROM myHiveTable")
println(df2.rdd.getNumPartitions)
The number of partitions isIt is correctly 50 and I confirmed that the data is correctly partitioned by userid
.
For one of my large datasets 3Tb I create a table with 1000 partitions which created literally ~million of files! Which exceeds a directory item limit of 1048576 and gives org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException
Question
What does the number of files created depend on?
Question
Is there a way to limit number of files created?
Question
Should I worry about these files? Does it hurt performance on df2
by having all these files? It is always said we should not create too many partitions because it is problematic.
Question
I found this info HIVE Dynamic Partitioning tips that the number of files might be related to number of mappers. It is suggested to use distribute by
while inserting to hive table. How could I do it in Spark?
Question
If the problem is indeed as in the link above, here How to control the file numbers of hive table after inserting data on MapR-FS they suggest using options such as hive.merge.mapfiles
or hive.merge.mapredfiles
to merge all the small files after map reduce job. Are there options for this in Spark?
Please use spark sql which will use HiveContext to write data into Hive table, so it will use the number of buckets which you have configured in the table schema.
SparkSession.builder().
config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("hive.execution.engine","tez").
config("hive.exec.max.dynamic.partitions","400").
config("hive.exec.max.dynamic.partitions.pernode","400").
config("hive.enforce.bucketing","true").
config("optimize.sort.dynamic.partitionining","true").
config("hive.vectorized.execution.enabled","true").
config("hive.enforce.sorting","true").
enableHiveSupport().getOrCreate()
spark.sql(s"insert into hiveTableName partition (partition_column) select * from myParquetFile")
The bucketing implementation of spark is not honoring the specified number of bucket size. Each partitions is writing into a separate files, hence you end up with lot of files for each bucket.
Please refer this link https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil
Hope this helps.
Ravi
这篇关于为什么Spark saveAsTable with bucketBy创建数千个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!