使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区 [英] Partitioning a large skewed dataset in S3 with Spark's partitionBy method

查看:27
本文介绍了使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 将一个大型分区数据集写入磁盘,而 partitionBy 算法在我尝试过的两种方法中都遇到了困难.

I am trying to write out a large partitioned dataset to disk with Spark and the partitionBy algorithm is struggling with both of the approaches I've tried.

分区严重倾斜 - 有些分区很大,有些很小.

The partitions are heavily skewed - some of the partitions are massive and others are tiny.

问题 1:

当我在 repartitionBy 之前使用 repartition 时,Spark 将所有分区都写成一个文件,即使是大分区也是如此

When I use repartition before repartitionBy, Spark writes all partitions as a single file, even the huge ones

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

这需要很长时间才能执行,因为 Spark 不会并行写入大分区.如果其中一个分区有 1TB 数据,Spark 会尝试将整个 1TB 数据写入单个文件.

This takes forever to execute because Spark isn't writing the big partitions in parallel. If one of the partitions has 1TB of data, Spark will try to write the entire 1TB of data as a single file.

问题 2:

当我不使用 repartition 时,Spark 会写出太多文件.

When I don't use repartition, Spark writes out way too many files.

此代码将写出数量惊人的文件.

This code will write out an insane number of files.

df.write.partitionBy("some_col").parquet("partitioned_lake")

我在一个很小的 ​​8 GB 数据子集上运行它,Spark 写出了 85,000 多个文件!

I ran this on a tiny 8 GB data subset and Spark wrote out 85,000+ files!

当我尝试在生产数据集上运行此程序时,一个包含 1.3 GB 数据的分区被写出为 3,100 个文件.

When I tried running this on a production data set, one partition that has 1.3 GB of data was written out as 3,100 files.

我想要什么

我希望每个分区都写成 1 GB 的文件.因此,具有 7 GB 数据的分区将作为 7 个文件写出,而具有 0.3 GB 数据的分区将作为单个文件写出.

I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.

我前进的最佳途径是什么?

What is my best path forward?

推荐答案

最简单的解决方案是在 repartition 中添加一列或多列,并明确设置分区数.

The simplest solution is to add one or more columns to repartition and explicitly set the number of partitions.

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

哪里:

  • numPartitions - 应该是写入分区目录的所需文件数量的上限(实际数量可以更低).
  • $"some_other_col"(和可选的附加列)应该具有高基数并且独立于 $"some_column(这些之间应该有函数依赖两个,并且不应该高度相关).

  • numPartitions - should be an upper bound (actual number can be lower) of the desired number of files written to a partition directory.
  • $"some_other_col" (and optional additional columns) should have high cardinality and be independent of the $"some_column (there should be functional dependency between these two, and shouldn't be highly correlated).

如果数据不包含此类列,您可以使用o.a.s.sql.functions.rand.

If data doesn't contain such column you can use o.a.s.sql.functions.rand.

import org.apache.spark.sql.functions.rand

df.repartition(numPartitions, $"some_col", rand)
  .write.partitionBy("some_col")
  .parquet("partitioned_lake")

这篇关于使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆