如何计算合并的最佳numberOfPartitions? [英] How to calculate the best numberOfPartitions for coalesce?

查看:99
本文介绍了如何计算合并的最佳numberOfPartitions?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我了解通常,在以下情况下,应使用coalesce():

So, I understand that in general one should use coalesce() when:

由于filter或某些其他操作可能会导致减少原始数据集(RDD,DF)而导致分区数量减少. coalesce()对于过滤掉大型数据集后更有效地运行操作很有用.

the number of partitions decreases due to a filter or some other operation that may result in reducing the original dataset (RDD, DF). coalesce() is useful for running operations more efficiently after filtering down a large dataset.

我也理解它比repartition便宜,因为它仅在必要时才通过移动数据来减少改组.我的问题是如何定义coalesce采用的参数(idealPartionionNo).我正在一个项目中工作,该项目是由另一位工程师传递给我的,他正在使用以下计算方法来计算该参数的值.

I also understand that it is less expensive than repartition as it reduces shuffling by moving data only if necessary. My problem is how to define the parameter that coalesce takes (idealPartionionNo). I am working on a project which was passed to me from another engineer and he was using the below calculation to compute the value of that parameter.

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR

然后将其与partitioner对象一起使用:

This is then used with a partitioner object:

val partitioner = new HashPartitioner(idealPartionionNo)

但也可用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)

这是正确的方法吗? idealPartionionNo值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我通常如何定义它?

Is this the right approach? What is the main idea behind the idealPartionionNo value computation? What is the REPARTITION_FACTOR? How do I generally work to define that?

此外,由于YARN负责即时识别可用的执行者,因此有一种方法可以即时获取该编号(AVAILABLE_EXECUTOR_INSTANCES)并将其用于计算idealPartionionNo(即用NO_OF_EXECUTOR_INSTANCES >)?

Also, since YARN is responsible for identifying the available executors on the fly is there a way of getting that number (AVAILABLE_EXECUTOR_INSTANCES) on the fly and use that for computing idealPartionionNo (i.e. replace NO_OF_EXECUTOR_INSTANCES with AVAILABLE_EXECUTOR_INSTANCES)?

理想情况下,该表格的一些实际示例:

Ideally, some actual examples of the form:

  • 这是一个数据集( size );
  • 这是RDD/DF的许多转换和可能的重用.
  • 在这里您应该重新分区/协商.
  • 假设您有n执行者,且m核心和一个分区因子等于k
  • Here 's a dataset (size);
  • Here's a number of transformations and possible reuses of an RDD/DF.
  • Here is where you should repartition/coalesce.
  • Assume you have n executors with m cores and a partition factor equal to k

然后:

  • 理想的分区数量为==> ???

此外,如果您可以引我到一个不错的博客来解释这些问题,我将不胜感激.

Also, if you can refer me to a nice blog that explains these I would really appreciate it.

推荐答案

实际上,最佳分区数量更多地取决于您拥有的数据,所使用的转换和总体配置,而不是可用资源.

In practice optimal number of partitions depends more on the data you have, transformations you use and overall configuration than the available resources.

  • 如果分区的数量太少,您将遇到长时间的GC暂停,不同类型的内存问题以及最后的资源利用不足的情况.
  • 如果分区的数量过多,则维护成本很容易超过处理成本.此外,如果使用非分布式归约运算(例如reducetreeReduce相比),则大量分区会导致驱动程序上的负载更高.
  • If the number of partitions is too low you'll experience long GC pauses, different types of memory issues, and lastly suboptimal resource utilization.
  • If the number of partitions is too high then maintenance cost can easily exceed processing cost. Moreover, if you use non-distributed reducing operations (like reduce in contrast to treeReduce), a large number of partitions results in a higher load on the driver.

您可以找到许多规则,这些规则建议与核心数量(因子2或3似乎很常见)相比过度预订分区,或者将分区保持在一定大小,但这并未考虑您自己的代码:

You can find a number of rules which suggest oversubscribing partitions compared to the number of cores (factor 2 or 3 seems to be common) or keeping partitions at a certain size but this doesn't take into account your own code:

  • 如果分配很多,可以预期会出现较长的GC暂停,最好使用较小的分区.
  • 如果某段代码很昂贵,那么可以通过更高的并发费用来分摊洗牌成本.
  • 如果有过滤器,则可以根据谓词的判别力来调整分区数(如果希望保留5%的数据和99%的数据,则可以做出不同的决定).

我认为:

  • 通过一次性工作,可以将更多的分区保留在安全的一边(慢于失败总比失败要好).
  • 对于可重复使用的作业,请从保守的配置开始,然后执行-监视-调整配置-重复.
  • 不要尝试根据执行程序或内核的数量使用固定数量的分区.首先了解您的数据和代码,然后调整配置以反映您的理解.

  • With one-off jobs keep higher number partitions to stay on the safe side (slower is better than failing).
  • With reusable jobs start with conservative configuration then execute - monitor - adjust configuration - repeat.
  • Don't try to use fixed number of partitions based on the number of executors or cores. First understand your data and code, then adjust configuration to reflect your understanding.

通常,相对容易地确定集群表现出稳定行为的每个分区的原始数据量(根据我的经验,它的大小在几百兆字节范围内,具体取决于格式,要使用的数据结构加载数据和配置).这是您要查找的幻数".

Usually, it is relatively easy to determine the amount of raw data per partition for which your cluster exhibits stable behavior (in my experience it is somewhere in the range of few hundred megabytes, depending on the format, data structure you use to load data, and configuration). This is the "magic number" you're looking for.

一些您通常需要记住的事情:

Some things you have to remember in general:

  • 分区数不一定反映 数据分发.任何需要改组的操作(*byKeyjoinRDD.partitionByDataset.repartition)都可能导致数据分布不均匀.始终监视您的作业以发现数据严重偏斜的症状.
  • 分区的数量通常不是恒定的.具有多个依赖项(unioncoGroupjoin)的任何操作都可能影响分区的数量.
  • Number of partitions doesn't necessarily reflect data distribution. Any operation that requires shuffle (*byKey, join, RDD.partitionBy, Dataset.repartition) can result in non-uniform data distribution. Always monitor your jobs for symptoms of a significant data skew.
  • Number of partitions in general is not constant. Any operation with multiple dependencies (union, coGroup, join) can affect the number of partitions.

这篇关于如何计算合并的最佳numberOfPartitions?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆