Spark:以编程方式获取集群核心数 [英] Spark: get number of cluster cores programmatically
问题描述
我在纱线集群中运行我的火花应用程序。在我的代码中,我使用数量可用的队列核心在我的数据集上创建分区:
I run my spark application in yarn cluster. In my code I use number available cores of queue for creating partitions on my dataset:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:我如何通过编程方式而不是按配置获取队列的可用核心数? / p>
My question: how can I get number available cores of queue by programmatically way and not by configuration?
推荐答案
有很多方法可以从Spark中获取集群中的执行程序数和核心数。这是我过去使用过的一些Scala实用程序代码。您应该能够轻松地将其适应Java。有两个主要想法:
There are ways to get both the number of executors and the number of cores in a cluster from Spark. Here is a bit of Scala utility code that I've used in the past. You should easily be able to adapt it to Java. There are two key ideas:
-
工人数量是执行者数减去1或
sc .getExecutorStorageStatus.length - 1
。
执行 java可以获得每个工作者的核心数工人上的.lang.Runtime.getRuntime.availableProcessors
。
剩下的代码是使用Scala implicits向 SparkContext
添加便利方法的样板。我写了1.x年前的代码,这就是为什么它没有使用 SparkSession
。
The rest of the code is boilerplate for adding convenience methods to SparkContext
using Scala implicits. I wrote the code for 1.x years ago, which is why it is not using SparkSession
.
最后一个要点:合并到多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。在实践中,我使用1.5x到4x之间的任何地方,具体取决于数据的大小以及作业是否在共享集群上运行。
One final point: it is often a good idea to coalesce to a multiple of your cores as this can improve performance in the case of skewed data. In practice, I use anywhere between 1.5x and 4x, depending on the size of data and whether the job is running on a shared cluster or not.
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
这篇关于Spark:以编程方式获取集群核心数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!