使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区 [英] Partitioning a large skewed dataset in S3 with Spark's partitionBy method

查看:167
本文介绍了使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且partitionBy算法在我尝试过的两种方法中都遇到了麻烦.

I am trying to write out a large partitioned dataset to disk with Spark and the partitionBy algorithm is struggling with both of the approaches I've tried.

这些分区严重倾斜-有些分区很大,有些很小.

The partitions are heavily skewed - some of the partitions are massive and others are tiny.

问题1 :

当我在repartitionBy之前使用重新分区时,Spark将所有分区写为单个文件,即使是大文件也是如此

When I use repartition before repartitionBy, Spark writes all partitions as a single file, even the huge ones

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

这将永远执行,因为Spark不会并行编写大型分区.如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入.

This takes forever to execute because Spark isn't writing the big partitions in parallel. If one of the partitions has 1TB of data, Spark will try to write the entire 1TB of data as a single file.

问题2 :

当我不使用repartition时,Spark会写出太多文件.

When I don't use repartition, Spark writes out way too many files.

此代码将写出疯狂的文件.

This code will write out an insane number of files.

df.write.partitionBy("some_col").parquet("partitioned_lake")

我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000多个文件!

I ran this on a tiny 8 GB data subset and Spark wrote out 85,000+ files!

当我尝试在生产数据集上运行它时,一个包含1.3 GB数据的分区被写为3,100个文件.

When I tried running this on a production data set, one partition that has 1.3 GB of data was written out as 3,100 files.

我想要的东西

我希望每个分区都写成1 GB文件.因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出.

I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.

我最好的前进道路是什么?

What is my best path forward?

推荐答案

最简单的解决方案是将一个或多个列添加到repartition并显式设置分区数.

The simplest solution is to add one or more columns to repartition and explicitly set the number of partitions.

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

其中:

  • numPartitions-应该是写入分区目录的所需文件数的上限(实际数字可以更低).
  • $"some_other_col"(和可选的其他列)应该具有高基数,并且独立于$"some_column(这两者之间应该具有功能依赖性,并且不应该高度相关).

  • numPartitions - should be an upper bound (actual number can be lower) of the desired number of files written to a partition directory.
  • $"some_other_col" (and optional additional columns) should have high cardinality and be independent of the $"some_column (there should be functional dependency between these two, and shouldn't be highly correlated).

如果数据不包含此类列,则可以使用o.a.s.sql.functions.rand.

If data doesn't contain such column you can use o.a.s.sql.functions.rand.

import org.apache.spark.sql.functions.rand

df.repartition(numPartitions, $"some_col", rand)
  .write.partitionBy("some_col")
  .parquet("partitioned_lake")

这篇关于使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆