Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降 [英] Spark + Parquet + Snappy: Overall compression ratio loses after spark shuffles data

查看:60
本文介绍了Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

社区!

请帮助我了解如何使用 Spark 获得更好的压缩率?

Please help me understand how to get better compression ratio with Spark?

让我描述一下案例:

  1. 我有数据集,我们将其称为 HDFS 上的 产品,它是使用 Sqoop ImportTool 作为镶木地板文件使用编解码器 snappy 导入的.作为导入的结果,我有 100 个文件,总共 46 GB du,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB).记录总数略高于 80 亿84 列

  1. I have dataset, let's call it product on HDFS which was imported using Sqoop ImportTool as-parquet-file using codec snappy. As result of import, I have 100 files with total 46 GB du, files with diffrrent size (min 11MB, max 1.5GB, avg ~ 500MB). Total count of records a little bit more than 8 billions with 84 columns

我正在使用 snappy 对 Spark 进行简单的读取/重新分区/写入,结果我得到:

I'm doing simple read/repartition/write with Spark using snappy as well and as result I'm getting:

~100 GB 输出大小相同的文件数、相同的编解码器、相同的数量和相同的列.

~100 GB output size with the same files count, same codec, same count and same columns.

代码片段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")

  1. 使用镶木地板工具,我查看了摄取和处理的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]

已处理:

creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
extra:                          org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"

AVAILABLE:                      OPTIONAL INT64 R:0 D:1
...

row group 1:                    RC:6660100 TS:243047789 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]

另一方面,没有重新分区或使用合并 - 大小仍然接近摄取数据大小.

In other hand, without repartition or using coalesce - size remains close to ingest data size.

  1. 接下来,我做了以下事情:

  1. Going forward, I did following:

  • 读取数据集并用

  • read dataset and write it back with

productDF
  .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
  .option("compression", "none")
  .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")

  • 读取数据集,重新分区并用

  • read dataset, repartition and write it back with

    productDF
      .repartition(500)
      .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
      .option("compression", "none")
      .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
    

  • 结果:80 GB 没有和 283 GB 有相同数量的输出文件重新分区

    As result: 80 GB without and 283 GB with repartition with same # of output files

    80GB 镶木地板元示例:

    80GB parquet meta example:

    AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]
    

    283 GB 镶木地板元示例:

    283 GB parquet meta example:

    AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]
    

    看来,即使没有未压缩的数据,镶木地板本身(带编码?)也大大减少了数据的大小.如何 ?:)

    It seems, that parquet itself (with encoding?) much reduce size of data even without uncompressed data. How ? :)

    我尝试读取未压缩的 80GB、重新分区并写回 - 我有 283 GB

    I tried to read uncompressed 80GB, repartition and write back - I've got my 283 GB

    • 对我来说,第一个问题是为什么在 Spark 重新分区/洗牌后我的尺寸变大了?

    • The first question for me is why I'm getting bigger size after spark repartitioning/shuffle?

    第二个是如何有效地打乱 spark 中的数据以有利于 Parquet 编码/压缩(如果有的话)?

    The second is how to efficiently shuffle data in spark to benefit parquet encoding/compression if there any?

    一般来说,即使我没有做任何更改,我也不希望我的数据大小在火花处理后增长.

    In general, I don't want that my data size growing after spark processing, even if I didn't change anything.

    另外,我没有找到,是否有任何可配置的压缩率用于 snappy,例如-1 ... -9?据我所知,gzip 有这个,但是在 Spark/Parquet writer 中控制这个速率的方法是什么?

    Also, I failed to find, is there any configurable compression rate for snappy, e.g. -1 ... -9? As I know, gzip has this, but what is the way to control this rate in Spark/Parquet writer?

    感谢您的帮助!

    谢谢!

    推荐答案

    当您在数据帧上调用 repartition(n) 时,您正在执行循环分区.重新分区之前存在的任何数据局部性都消失了,熵增加了.因此,运行长度和字典编码器以及压缩编解码器实际上没有太多可使用的.

    when you call repartition(n) on a dataframe you are doing a round-robin partitioning. Any data locality that existed prior to repartitioning is gone entropy has gone up. So run length and dictionary encoders as well as compression codecs don't really have much to work with.

    所以当你重新分区时,你需要使用 repartition (n, col) 版本.给它一个可以保留数据局部性的好列.

    so when you do you repartition you need to use repartition (n, col) version. give it a good column that would preserve data locality.

    此外,由于您可能正在优化下游作业的 sqooped 表,您可以 sortWithinPartition 以加快扫描速度.

    Also, since you are probably optimizing your sqooped tables for downstream jobs you can sortWithinPartition for faster scans.

    df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)

    这篇关于Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆