如何计算合并的最佳分区数? [英] How to calculate the best numberOfPartitions for coalesce?

查看:25
本文介绍了如何计算合并的最佳分区数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以,我明白通常应该在以下情况下使用 coalesce():

So, I understand that in general one should use coalesce() when:

由于 filter 或其他一些可能导致原始数据集(RDD,DF)减少的操作,分区数量减少.coalesce() 对于过滤大型数据集后更有效地运行操作非常有用.

the number of partitions decreases due to a filter or some other operation that may result in reducing the original dataset (RDD, DF). coalesce() is useful for running operations more efficiently after filtering down a large dataset.

我也知道它比 repartition 更便宜,因为它仅在必要时通过移动数据来减少混洗.我的问题是如何定义 coalesce 需要的参数 (idealPartionNo).我正在做一个项目,该项目是从另一位工程师传递给我的,他正在使用以下计算来计算该参数的值.

I also understand that it is less expensive than repartition as it reduces shuffling by moving data only if necessary. My problem is how to define the parameter that coalesce takes (idealPartionionNo). I am working on a project which was passed to me from another engineer and he was using the below calculation to compute the value of that parameter.

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR

然后与 partitioner 对象一起使用:

This is then used with a partitioner object:

val partitioner = new HashPartitioner(idealPartionionNo)

但也用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)

这是正确的方法吗?idealPartionNo 值计算背后的主要思想是什么?REPARTITION_FACTOR 是什么?我通常如何定义它?

Is this the right approach? What is the main idea behind the idealPartionionNo value computation? What is the REPARTITION_FACTOR? How do I generally work to define that?

此外,由于 YARN 负责动态识别可用的执行程序,因此有没有一种方法可以动态获取该编号 (AVAILABLE_EXECUTOR_INSTANCES) 并将其用于计算 idealPartionNo(即将NO_OF_EXECUTOR_INSTANCES 替换为AVAILABLE_EXECUTOR_INSTANCES)?

Also, since YARN is responsible for identifying the available executors on the fly is there a way of getting that number (AVAILABLE_EXECUTOR_INSTANCES) on the fly and use that for computing idealPartionionNo (i.e. replace NO_OF_EXECUTOR_INSTANCES with AVAILABLE_EXECUTOR_INSTANCES)?

理想情况下,表单的一些实际示例:

Ideally, some actual examples of the form:

  • 这是一个数据集(大小);
  • 以下是 RDD/DF 的一些转换和可能的重用.
  • 这里是您应该重新分区/合并的地方.
  • 假设你有 n executorsm cores 和一个 分区因子等于 k
  • Here 's a dataset (size);
  • Here's a number of transformations and possible reuses of an RDD/DF.
  • Here is where you should repartition/coalesce.
  • Assume you have n executors with m cores and a partition factor equal to k

然后:

  • 理想的分区数是 ==> ???

另外,如果您能向我推荐一个解释这些的不错的博客,我将不胜感激.

Also, if you can refer me to a nice blog that explains these I would really appreciate it.

推荐答案

实际上,最佳分区数更多地取决于您拥有的数据、您使用的转换和整体配置,而不是可用资源.

In practice optimal number of partitions depends more on the data you have, transformations you use and overall configuration than the available resources.

  • 如果分区数量太少,您会遇到长时间的 GC 暂停、不同类型的内存问题,以及最后的资源利用率不理想.
  • 如果分区数量过多,那么维护成本很容易超过处理成本.此外,如果您使用非分布式归约操作(例如 reducetreeReduce 对比),大量分区会导致驱动程序负载更高.
  • If the number of partitions is too low you'll experience long GC pauses, different types of memory issues, and lastly suboptimal resource utilization.
  • If the number of partitions is too high then maintenance cost can easily exceed processing cost. Moreover, if you use non-distributed reducing operations (like reduce in contrast to treeReduce), a large number of partitions results in a higher load on the driver.

您可以找到许多规则,这些规则建议与内核数量相比过度订阅分区(因子 2 或 3 似乎很常见)或将分区保持在特定大小,但这并未考虑您自己的代码:

You can find a number of rules which suggest oversubscribing partitions compared to the number of cores (factor 2 or 3 seems to be common) or keeping partitions at a certain size but this doesn't take into account your own code:

  • 如果你分配了很多,你可能会出现长时间的 GC 暂停,最好使用较小的分区.
  • 如果某段代码很昂贵,那么您的 shuffle 成本可以通过更高的并发性进行摊销.
  • 如果您有过滤器,您可以根据谓词的判别力调整分区数(如果您希望保留 5% 的数据和 99% 的数据,您会做出不同的决定).

在我看来:

  • 使用一次性作业保持更多分区以确保安全(慢总比失败好).
  • 对于可重用的作业,从保守配置开始,然后执行 - 监控 - 调整配置 - 重复.
  • 不要尝试根据执行程序或内核的数量使用固定数量的分区.首先了解您的数据和代码,然后调整配置以反映您的理解.

  • With one-off jobs keep higher number partitions to stay on the safe side (slower is better than failing).
  • With reusable jobs start with conservative configuration then execute - monitor - adjust configuration - repeat.
  • Don't try to use fixed number of partitions based on the number of executors or cores. First understand your data and code, then adjust configuration to reflect your understanding.

通常,确定集群表现出稳定行为的每个分区的原始数据量相对容易(根据我的经验,它在几百兆的范围内,具体取决于您使用的格式和数据结构)加载数据和配置).这是您正在寻找的幻数".

Usually, it is relatively easy to determine the amount of raw data per partition for which your cluster exhibits stable behavior (in my experience it is somewhere in the range of few hundred megabytes, depending on the format, data structure you use to load data, and configuration). This is the "magic number" you're looking for.

一般来说你必须记住的一些事情:

Some things you have to remember in general:

  • 分区数不一定反映数据分布.任何需要 shuffle 的操作(*byKeyjoinRDD.partitionByDataset.repartition)都可能导致数据分布不均匀.始终监控您的工作是否有明显数据倾斜的迹象.
  • 分区的数量通常不是恒定的.任何具有多个依赖项(unioncoGroupjoin)的操作都会影响分区的数量.
  • Number of partitions doesn't necessarily reflect data distribution. Any operation that requires shuffle (*byKey, join, RDD.partitionBy, Dataset.repartition) can result in non-uniform data distribution. Always monitor your jobs for symptoms of a significant data skew.
  • Number of partitions in general is not constant. Any operation with multiple dependencies (union, coGroup, join) can affect the number of partitions.

这篇关于如何计算合并的最佳分区数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆