为什么聚合的 Spark Parquet 文件比原始文件大? [英] Why are Spark Parquet files for an aggregate larger than the original?

查看:30
本文介绍了为什么聚合的 Spark Parquet 文件比原始文件大?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为最终用户创建一个聚合文件,以避免让他们处理具有更大文件的多个源.为此,我:A) 遍历所有源文件夹,去除最常请求的 12 个字段,在这些结果位于同一位置的新位置分离出镶木地板文件.B) 我尝试回顾在步骤 A 中创建的文件,并通过按 12 个字段分组来重新聚合它们,将其缩减为每个唯一组合的汇总行.

I am trying to create an aggregate file for end users to utilize to avoid having them process multiple sources with much larger files. To do that I: A) iterate through all source folders, stripping out 12 fields that are most commonly requested, spinning out parquet files in a new location where these results are co-located. B) I try to go back through the files created in step A and re-aggregate them by grouping by the 12 fields to reduce it to a summary row for each unique combination.

我发现步骤 A 以 5:1 的比例减少了有效负载(大约 250 个演出变成了 48.5 个演出).然而,步骤 B 不是进一步减少,而是比步骤 A 增加 50%.但是,我的计数匹配.

What I'm finding is that step A reduces the payload 5:1 (roughly 250 gigs becomes 48.5 gigs). Step B however, instead of further reducing this, increase by 50% over step A. However, my counts match.

这是使用 Spark 1.5.2
我的代码经过修改,只是为了用 field1...field12 替换字段名称以使其更具可读性,下面是我注意到的结果.

This is using Spark 1.5.2
My code, modified only to replace the field names with field1...field12 to make it more readable, is below with the results I've noted.

虽然我不一定期望再减少 5:1,但我不知道我做错了什么来增加具有相同架构的更少行的存储量.谁能帮助我理解我做错了什么?

While I don't necessarily expect another 5:1 reduction, I don't know what I'm doing incorrectly to increase the storage side for less rows with the same schema. Anyone able to help me understand what I did wrong?

谢谢!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)

推荐答案

一般而言,Parquet 等列式存储格式在数据分布(数据组织)和单个列的基数方面非常敏感.数据越有组织,基数越低,存储效率越高.

In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage.

聚合,作为您应用的聚合,必须对数据进行洗牌.当您检查执行计划时,您会看到它正在使用哈希分区器.这意味着聚合后的分布可能比原始数据的分布效率低.同时sum可以减少行数,但增加rCount列的基数.

Aggregation, as the one you apply, has to shuffle the data. When you check the execution plan you'll see it is using hash partitioner. It means that after aggregation distribution can be less efficient than the one for the original data. At the same time sum can reduce number of rows but increase cardinality for rCount column.

您可以尝试不同的工具来解决这个问题,但并非所有工具都在 Spark 1.5.2 中可用:

You can try different tools to correct for that but not all are available in Spark 1.5.2:

  • 按基数较低的列(由于完全 shuffle 非常昂贵)或 sortWithinPartitions 对完整数据集进行排序.
  • 使用DataFrameWriterpartitionBy方法对低基数列的数据进行分区.
  • 使用 DataFrameWriter (Spark 2.0.0+) 的 bucketBysortBy 方法,使用分桶和局部排序改进数据分布.
  • Sort complete dataset by columns having low cardinality (quite expensive due to full shuffle) or sortWithinPartitions.
  • Use partitionBy method of DataFrameWriter to partition data using low cardinality columns.
  • Use bucketBy and sortBy methods of DataFrameWriter (Spark 2.0.0+) to improve data distribution using bucketing and local sorting.

这篇关于为什么聚合的 Spark Parquet 文件比原始文件大?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆