为什么Spark Parquet文件的合计大于原始文件? [英] Why are Spark Parquet files for an aggregate larger than the original?

查看:189
本文介绍了为什么Spark Parquet文件的合计大于原始文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个供最终用户使用的聚合文件,以免他们处理具有更大文件的多个源。为此,我:
A)遍历所有源文件夹,剥离最常请求的12个字段,将实木复合地板文件拆分到这些结果并置的新位置。
B)我尝试浏览步骤A中创建的文件,并通过按12个字段分组将它们重新汇总,以将其减少为每个唯一组合的摘要行。

I am trying to create an aggregate file for end users to utilize to avoid having them process multiple sources with much larger files. To do that I: A) iterate through all source folders, stripping out 12 fields that are most commonly requested, spinning out parquet files in a new location where these results are co-located. B) I try to go back through the files created in step A and re-aggregate them by grouping by the 12 fields to reduce it to a summary row for each unique combination.

我发现,步骤A减少了5:1的有效负载(大约250演出变成48.5演出)。但是,步骤B而不是进一步减少该步骤,而是比步骤A增加了50%。但是,我的计数匹配。

What I'm finding is that step A reduces the payload 5:1 (roughly 250 gigs becomes 48.5 gigs). Step B however, instead of further reducing this, increase by 50% over step A. However, my counts match.

这是使用Spark 1.5.2

我的代码经过修改,仅用field1 ... field12替换了字段名称,以使其更具可读性,下面是我注意到的结果。

This is using Spark 1.5.2
My code, modified only to replace the field names with field1...field12 to make it more readable, is below with the results I've noted.

虽然我不一定期望再减少5:1的比例,但我不知道我在做什么以增加具有相同模式的行数而减少存储量的错误做法。有人能帮助我了解我做错了什么吗?

While I don't necessarily expect another 5:1 reduction, I don't know what I'm doing incorrectly to increase the storage side for less rows with the same schema. Anyone able to help me understand what I did wrong?

谢谢!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)


推荐答案

通常,诸如Parquet之类的列存储格式在数据分发(数据组织)和单个列的基数方面非常敏感。数据越有条理,基数越低,存储效率就越高。

In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage.

聚合(如您所应用的那样)必须对数据进行混洗。查看执行计划后,您会看到它正在使用哈希分区程序。这意味着聚合后的分布效率可能低于原始数据的分布效率。同时, sum 可以减少行数,但增加 rCount 列的基数。

Aggregation, as the one you apply, has to shuffle the data. When you check the execution plan you'll see it is using hash partitioner. It means that after aggregation distribution can be less efficient than the one for the original data. At the same time sum can reduce number of rows but increase cardinality for rCount column.

您可以尝试使用其他工具进行纠正,但Spark 1.5.2中并非所有工具都可用:

You can try different tools to correct for that but not all are available in Spark 1.5.2:


  • 按基数低(由于完全混洗而非常昂贵)或 sortWithinPartitions 的列对完整数据集进行排序。

  • 使用 DataFrameWriter partitionBy 方法使用低基数列对数据进行分区。

  • 使用 DataFrameWriter bucketBy sortBy 方法(Spark 2.0。 0+),以使用存储分区和本地排序功能来改善数据分配。

  • Sort complete dataset by columns having low cardinality (quite expensive due to full shuffle) or sortWithinPartitions.
  • Use partitionBy method of DataFrameWriter to partition data using low cardinality columns.
  • Use bucketBy and sortBy methods of DataFrameWriter (Spark 2.0.0+) to improve data distribution using bucketing and local sorting.

这篇关于为什么Spark Parquet文件的合计大于原始文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆