为什么 Spark saveAsTable 与 bucketBy 创建数千个文件? [英] Why is Spark saveAsTable with bucketBy creating thousands of files?

查看:30
本文介绍了为什么 Spark saveAsTable 与 bucketBy 创建数千个文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

Spark 2.0.1,集群模式下的 spark-submit.我正在从 hdfs 读取镶木地板文件:

val spark = SparkSession.builder.appName("myApp").config("hive.metastore.uris", "thrift://XXX.XXX.net:9083").config("spark.sql.sources.bucketing.enabled", true).enableHiveSupport().getOrCreate()val df = spark.read.format("镶木地板").load("hdfs://XXX.XX.X.XX/myParquetFile")

我将 df 保存到一个带有 50 个按 userid 分组的桶的 hive 表:

df0.write.bucketBy(50, "用户名").saveAsTable("myHiveTable")

现在,当我查看 hdfs /user/hive/warehouse 中的 hive 仓库时,有一个名为 myHiveTable 的文件夹.里面是一堆 part-*.parquet 文件.我希望有 50 个文件.但是不,有3201文件!!!!每个分区有 64 个文件,为什么?对于我保存为配置单元表的不同文件,每个分区有不同数量的文件.所有文件都非常小,每个只有几十 Kb!

让我补充一下,myParquetFile 中不同userid 的数量约为1 000 000.

问题

为什么文件夹中有 3201 个文件而不是 50 个!这些是什么?

当我将此表读回 DataFrame 并打印分区数时:

val df2 = spark.sql("SELECT * FROM myHiveTable")打印(df2.rdd.getNumPartitions)

分区的数量是正确的50,我确认数据是按userid正确分区的.

对于我的一个 3Tb 大数据集,我创建了一个包含 1000 个分区的表,这实际上创建了大约一百万个文件!超过 1048576 的目录项限制并给出 org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException

问题

创建的文件数量取决于什么?

问题

有没有办法限制创建的文件数量?

问题

我应该担心这些文件吗?拥有所有这些文件是否会影响 df2 的性能?人们总是说我们不应该创建太多的分区,因为它是有问题的.

问题

我找到了这个信息 希望这会有所帮助.

拉维

Context

Spark 2.0.1, spark-submit in cluster mode. I am reading a parquet file from hdfs:

val spark = SparkSession.builder
      .appName("myApp")
      .config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
      .config("spark.sql.sources.bucketing.enabled", true)
      .enableHiveSupport()
      .getOrCreate()

val df = spark.read
              .format("parquet")
              .load("hdfs://XXX.XX.X.XX/myParquetFile")

I am saving the df to a hive table with 50 buckets grouped by userid:

df0.write
   .bucketBy(50, "userid")
   .saveAsTable("myHiveTable")

Now, when I look into the hive warehouse at my hdfs /user/hive/warehouse there is a folder named myHiveTable. Inside it are a bunch of part-*.parquet files. I would expect there to be 50 files. But no, there are 3201 files!!!! There are 64 files per partition, why? There are different number of files per partitions for different files I saved as hive table. All the files are very small, just tens of Kb each!

Let me add, that number of different userid is about 1 000 000 in myParquetFile.

Question

Why are there 3201 files in the folder instead of 50! What are they?

When I read this table back into DataFrame and print number of partitions:

val df2 = spark.sql("SELECT * FROM myHiveTable") 
println(df2.rdd.getNumPartitions)

The number of partitions isIt is correctly 50 and I confirmed that the data is correctly partitioned by userid.

For one of my large datasets 3Tb I create a table with 1000 partitions which created literally ~million of files! Which exceeds a directory item limit of 1048576 and gives org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException

Question

What does the number of files created depend on?

Question

Is there a way to limit number of files created?

Question

Should I worry about these files? Does it hurt performance on df2 by having all these files? It is always said we should not create too many partitions because it is problematic.

Question

I found this info HIVE Dynamic Partitioning tips that the number of files might be related to number of mappers. It is suggested to use distribute by while inserting to hive table. How could I do it in Spark?

Question

If the problem is indeed as in the link above, here How to control the file numbers of hive table after inserting data on MapR-FS they suggest using options such as hive.merge.mapfiles or hive.merge.mapredfiles to merge all the small files after map reduce job. Are there options for this in Spark?

解决方案

Please use spark sql which will use HiveContext to write data into Hive table, so it will use the number of buckets which you have configured in the table schema.

 SparkSession.builder().
  config("hive.exec.dynamic.partition", "true").
  config("hive.exec.dynamic.partition.mode", "nonstrict").
  config("hive.execution.engine","tez").
  config("hive.exec.max.dynamic.partitions","400").
  config("hive.exec.max.dynamic.partitions.pernode","400").
  config("hive.enforce.bucketing","true").
  config("optimize.sort.dynamic.partitionining","true").
  config("hive.vectorized.execution.enabled","true").
  config("hive.enforce.sorting","true").
  enableHiveSupport().getOrCreate()

spark.sql(s"insert into hiveTableName partition (partition_column) select * from  myParquetFile")

The bucketing implementation of spark is not honoring the specified number of bucket size. Each partitions is writing into a separate files, hence you end up with lot of files for each bucket.

Please refer this link https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil

Hope this helps.

Ravi

这篇关于为什么 Spark saveAsTable 与 bucketBy 创建数千个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆