Spark:以编程方式获取集群核心数 [英] Spark: get number of cluster cores programmatically

查看:456
本文介绍了Spark:以编程方式获取集群核心数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在纱线集群中运行我的火花应用程序。在我的代码中,我使用数量可用的队列核心在我的数据集上创建分区:

I run my spark application in yarn cluster. In my code I use number available cores of queue for creating partitions on my dataset:

Dataset ds = ...
ds.coalesce(config.getNumberOfCores());

我的问题:我如何通过编程方式而不是按配置获取队列的可用核心数? / p>

My question: how can I get number available cores of queue by programmatically way and not by configuration?

推荐答案

有很多方法可以从Spark中获取集群中的执行程序数和核心数。这是我过去使用过的一些Scala实用程序代码。您应该能够轻松地将其适应Java。有两个主要想法:

There are ways to get both the number of executors and the number of cores in a cluster from Spark. Here is a bit of Scala utility code that I've used in the past. You should easily be able to adapt it to Java. There are two key ideas:


  1. 工人数量是执行者数减去1或 sc .getExecutorStorageStatus.length - 1

执行 java可以获得每个工作者的核心数工人上的.lang.Runtime.getRuntime.availableProcessors

剩下的代码是使用Scala implicits向 SparkContext 添加便利方法的样板。我写了1.x年前的代码,这就是为什么它没有使用 SparkSession

The rest of the code is boilerplate for adding convenience methods to SparkContext using Scala implicits. I wrote the code for 1.x years ago, which is why it is not using SparkSession.

最后一个要点:合并到多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。在实践中,我使用1.5x到4x之间的任何地方,具体取决于数据的大小以及作业是否在共享集群上运行。

One final point: it is often a good idea to coalesce to a multiple of your cores as this can improve performance in the case of skewed data. In practice, I use anywhere between 1.5x and 4x, depending on the size of data and whether the job is running on a shared cluster or not.

import org.apache.spark.SparkContext

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    sc.getExecutorStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}


object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

这篇关于Spark:以编程方式获取集群核心数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆