pyspark:有效地将 partitionBy 写入与原始表相同数量的总分区 [英] pyspark: Efficiently have partitionBy write to same number of total partitions as original table

查看:40
本文介绍了pyspark:有效地将 partitionBy 写入与原始表相同数量的总分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个与 pyspark 的 repartitionBy() 函数相关的问题,我最初在 这个问题.我被要求将它作为一个单独的问题发布,所以这里是:

I had a question that is related to pyspark's repartitionBy() function which I originally posted in a comment on this question. I was asked to post it as a separate question, so here it is:

我知道 df.partitionBy(COL) 会将具有 COL 的每个值的所有行写入自己的文件夹,并且每个文件夹都会(假设行之前通过某个其他键分布在所有分区中)与之前整个表中的文件数量大致相同.我觉得这种行为很烦人.如果我有一个包含 500 个分区的大表,并且我在某些属性列上使用 partitionBy(COL),那么我现在有 100 个文件夹,每个包含 500 个(现在非常小)文件.

I understand that df.partitionBy(COL) will write all the rows with each value of COL to their own folder, and that each folder will (assuming the rows were previously distributed across all the partitions by some other key) have roughly the same number of files as were previously in the entire table. I find this behavior annoying. If I have a large table with 500 partitions, and I use partitionBy(COL) on some attribute columns, I now have for example 100 folders which each contain 500 (now very small) files.

我想要的是 partitionBy(COL) 行为,但文件大小和文件数量与我最初的大致相同.

What I would like is the partitionBy(COL) behavior, but with roughly the same file size and number of files as I had originally.

作为演示,上一个问题分享了一个玩具示例,您有一个包含 10 个分区的表并执行 partitionBy(dayOfWeek),现在您有 70 个文件,因为每个文件夹中有 10 个.我想要大约 10 个文件,每天一个,如果有更多数据,可能需要 2 或 3 个文件.

As demonstration, the previous question shares a toy example where you have a table with 10 partitions and do partitionBy(dayOfWeek) and now you have 70 files because there are 10 in each folder. I would want ~10 files, one for each day, and maybe 2 or 3 for days that have more data.

这可以轻松完成吗?像 df.write().repartition(COL).partitionBy(COL) 之类的东西似乎可以工作,但我担心(在一个非常大的表即将被分区的情况下)许多文件夹)必须首先将它组合到一些少数分区之前做 partitionBy(COL) 似乎是个坏主意.

Can this be easily accomplished? Something like df.write().repartition(COL).partitionBy(COL) seems like it might work, but I worry that (in the case of a very large table which is about to be partitioned into many folders) having to first combine it to some small number of partitions before doing the partitionBy(COL) seems like a bad idea.

非常感谢任何建议!

推荐答案

您有多种选择.在我下面的代码中,我假设您想用镶木地板编写,但您当然可以更改它.

You've got several options. In my code below I'll assume you want to write in parquet, but of course you can change that.

这将首先使用基于散列的分区来确保来自 COL 的有限数量的值进入每个分区.根据您为 numPartitions 选择的值,某些分区可能是空的,而其他分区可能充满了值——对于不确定原因的人,请阅读 这个.然后,当您在 DataFrameWriter 上调用 partitionBy 时,每个分区中的每个唯一值都将放置在其自己的单独文件中.

This will first use hash-based partitioning to ensure that a limited number of values from COL make their way into each partition. Depending on the value you choose for numPartitions, some partitions may be empty while others may be crowded with values -- for anyone not sure why, read this. Then, when you call partitionBy on the DataFrameWriter, each unique value in each partition will be placed in its own individual file.

警告:这种方法可能会导致分区大小不平衡和任务执行时间不平衡.当您的列中的值与许多行相关联(例如,一个城市列——New约克市可能有很多行),而其他值较少(例如,小城镇的值).

Warning: this approach can lead to lopsided partition sizes and lopsided task execution times. This happens when values in your column are associated with many rows (e.g., a city column -- the file for New York City might have lots of rows), whereas other values are less numerous (e.g., values for small towns).

当您希望 (1) 您写入的文件大小几乎相同 (2) 精确控制写入的文件数量时,此选项非常有用.这种方法首先对您的数据进行全局排序,然后找到将数据分解为 k 均匀大小的分区的拆分,其中 k 在 spark 配置 spark.xml 中指定.sql.shuffle.partitions.这意味着具有相同排序键值的所有值都彼此相邻,但有时它们会跨越拆分,并位于不同的文件中.这一点,如果您的用例要求具有相同键的所有行都在同一分区中,则不要使用这种方法.

This options works great when you want (1) the files you write to be of nearly equal sizes (2) exact control over the number of files written. This approach first globally sorts your data and then finds splits that break up the data into k evenly-sized partitions, where k is specified in the spark config spark.sql.shuffle.partitions. This means that all values with the same values of your sort key are adjacent to each other, but sometimes they'll span a split, and be in different files. This, if your use-case requires all rows with the same key to be in the same partition, then don't use this approach.

有两个额外的好处:(1) 通过对数据进行排序,它在磁盘上的大小通常可以减少(例如,按 user_id 对所有事件进行排序,然后按时间排序将导致列值出现大量重复,这有助于压缩)(2) 如果您写入支持它的文件格式(如 Parquet),那么后续读者可以通过使用谓词下推以最佳方式读取数据,因为 Parquet 写入器将写入元数据中每列的 MAX 和 MIN 值,如果查询指定的值超出分区的 (min, max) 范围,则允许读取器跳过行.

There are two extra bonuses: (1) by sorting your data its size on disk can often be reduced (e.g., sorting all events by user_id and then by time will lead to lots of repetition in column values, which aids compression) and (2) if you write to a file format the supports it (like Parquet) then subsequent readers can read data in optimally by using predicate push-down, because the parquet writer will write the MAX and MIN values of each column in the metadata, allowing the reader to skip rows if the query specifies values outside of the partition's (min, max) range.

请注意,在 Spark 中排序比仅重新分区更昂贵,并且需要额外的阶段.在幕后,Spark 会先确定一个阶段的拆分,然后在另一个阶段将数据打乱到这些拆分中.

Note that sorting in Spark is more expensive than just repartitioning and requires an extra stage. Behind the scenes Spark will first determine the splits in one stage, and then shuffle the data into those splits in another stage.

如果您在 Scala 上使用 spark,那么您可以编写一个客户分区程序,它可以克服基于哈希的分区程序的烦人问题.不幸的是,pySpark 中没有这个选项.如果你真的想在 pySpark 中编写自定义分区器,我发现这是可能的,尽管有点尴尬,通过使用 rdd.repartitionAndSortWithinPartitions:

If you're using spark on Scala, then you can write a customer partitioner, which can get over the annoying gotchas of the hash-based partitioner. Not an option in pySpark, unfortunately. If you really want to write a custom partitioner in pySpark, I've found this is possible, albeit a bit awkward, by using rdd.repartitionAndSortWithinPartitions:

df.rdd \
  .keyBy(sort_key_function) \  # Convert to key-value pairs
  .repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS, 
                                      partitionFunc=part_func) \
  .values() # get rid of keys \
.toDF().write.parquet(writePath)

也许其他人知道在 pyspark 中的数据帧上使用自定义分区器的更简单方法?

Maybe someone else knows an easier way to use a custom partitioner on a dataframe in pyspark?

这篇关于pyspark:有效地将 partitionBy 写入与原始表相同数量的总分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆