将多个小文件合并到Spark中的几个较大的文件中 [英] merge multiple small files in to few larger files in Spark

查看:2191
本文介绍了将多个小文件合并到Spark中的几个较大的文件中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我通过Spark使用配置单元。我在我的Spark代码中插入了分区表查询。输入数据为200 + gb。当Spark写入分区表时,它正在分散非常小的文件(kb中的文件)。所以现在输出分区表文件夹有5000多个小kb文件。我想将这些文件合并成几个大的MB文件,可能只有几个200MB的文件。

 'val result7A = hiveContext.sql(set hive.exec.dynamic.partition = true)

val result7B = hiveContext.sql(set hive.exec.dynamic.partition.mode = nonstrict)

val result7C = hiveContext.sql(SET hive.merge.size.per.task = 256000000)

val result7D = hiveContext.sql(SET hive.merge.mapfiles = true)

val result7E = hiveContext.sql(SET hive.merge.mapredfiles = true)

val result7F = hiveContext.sql(SET hive.merge.sparkfiles = true)

val result7G = hiveContext.sql(set hive.aux.jars.path = c:\\Applications\\\\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies 。)

result8 = hiveContext.sql(INSERT INTO TABLE partition_table PARTITION(date)select a,b,c from partition_json_table)'

上面的配置单元设置在mapreduce配置单元执行过程中工作,并将指定大小的文件。是否有任何选项可以执行此操作Spark或Scala?

解决方案

您可能想尝试使用 DataFrame.coalesce 方法;它返回一个带有指定分区数量的DataFrame(每个分区成为插入时的文件)。因此,使用您要插入的记录数量和每条记录的典型大小,如果您需要大约200MB的文件,您可以估计要合并多少个分区。

I using hive through Spark. I have a Insert into partitioned table query in my spark code. The input data is in 200+gb. When Spark is writing to a partitioned table, it is spitting very small files(files in kb's). so now the output partitioned table folder have 5000+ small kb files. I want to merge these in to few large MB files, may be about few 200mb files. I tired using hive merge settings, but they don't seem to work.

'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")

 val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")

val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")

val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")

val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")

val result7G = hiveContext.sql("set hive.aux.jars.path=c:\\Applications\\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")

val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'

The above hive settings work in a mapreduce hive execution and spits out files of specified size. Is there any option to do this Spark or Scala?

解决方案

You may want to try using the DataFrame.coalesce method; it returns a DataFrame with the specified number of partitions (each of which becomes a file on insertion). So using the number of records you are inserting and the typical size of each record, you can estimate how many partitions to coalesce to if you want files of ~200MB.

这篇关于将多个小文件合并到Spark中的几个较大的文件中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆