getExecutorMemoryStatus().size()无法输出正确数量的执行程序 [英] getExecutorMemoryStatus().size() not outputting correct num of executors

查看:104
本文介绍了getExecutorMemoryStatus().size()无法输出正确数量的执行程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

简而言之,我需要Spark集群中执行者/工作人员的数量,但是使用sc._jsc.sc().getExecutorMemoryStatus().size()给我1个,而实际上有12个执行者.

In short, I need the number of executors/workers in the Spark cluster, but using sc._jsc.sc().getExecutorMemoryStatus().size() gives me 1 when in fact there are 12 executors.

具有更多详细信息,我试图确定执行程序的数量,并使用该数量作为我要求Spark在其上分发RDD的分区的数量.我这样做是为了利用并行性,因为我的初始数据只是一个数字范围,但随后每个数据都在rdd#foreach方法中进行处理.该过程既需要内存,又需要大量计算,因此我希望最初的数字范围位于与执行程序一样多的分区中,以允许所有执行程序同时处理其大块.

With more details, I'm trying to determine the number of executors and use that number as the number of partitions I ask Spark to distribute my RDD across. I do this to leverage the parallelism, as my initial data is just a range of numbers but then every one of them gets processed in a rdd#foreach method. The process is both memory-wise and computationally heavy, so I want the range of numbers originally to reside in as many partitions as the executors, to allow all executors to process chunks of it simultanuously.

此问题中阅读评论,并查看

Reading the comment in this question and seeing the documentation for the scala's getExecutorMemoryStatus, the suggested command: sc._jsc.sc().getExecutorMemoryStatus().size() seemed reasonable. But for some reason I get an answer 1 no matter how many executors actually exist (in my last run - it was 12).

我在那做错了吗?我打错方法了吗?错误的方式?

Am I doing something wrong there? Am I calling the wrong method? In the wrong way?

我在一个独立的Spark集群上运行,该集群每次都为应用程序的运行而启动.

I am running on a standalone Spark cluster that is being initiated for the run of the application each time.

这是问题的最小示例:

from pyspark import SparkConf, SparkContext
import datetime


def print_debug(msg):
    dbg_identifier = 'dbg_et '
    print(dbg_identifier + str(datetime.datetime.now()) + ':  ' + msg)


print_debug('*****************before configuring sparkContext')
conf = SparkConf().setAppName("reproducing_bug_not_all_executors_working")
sc = SparkContext(conf=conf)
print_debug('*****************after configuring sparkContext')


def main():
    executors_num = sc._jsc.sc().getExecutorMemoryStatus().size()
    list_rdd = sc.parallelize([1, 2, 3, 4, 5], executors_num)
    print_debug('line before loop_a_lot. Number of partitions created={0}, 
        while number of executors is {1}'
          .format(list_rdd.getNumPartitions(), executors_num))
    list_rdd.foreach(loop_a_lot)
    print_debug('line after loop_a_lot')


def loop_a_lot(x):
    y = x
    print_debug('started working on item %d at ' % x + str(datetime.datetime.now()))
    for i in range(100000000):
        y = y*y/6+5
    print_debug('--------------------finished working on item %d at ' % x + str(datetime.datetime.now())
      + 'with a result: %.3f' % y)

if __name__ == "__main__":
    main()

并显示问题-在我最后一次运行它时,我在驱动程序的输出中得到了此信息(仅粘贴相关的部分,占位符而不是真实的ips和端口):

And to show the problem - at the last time I ran it I got, in the driver's output (pasting only relevant parts, placeholders instead of the real ips and ports):

$> grep -E 'dbg_et|Worker:54 - Starting Spark worker' slurm-<job-num>.out
2018-07-14 20:48:26 INFO  Worker:54 - Starting Spark worker <ip1>:<port1> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:26 INFO  Worker:54 - Starting Spark worker <ip1>:<port2> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip2>:<port3> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip2>:<port4> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip3>:<port5> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip3>:<port6> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip4>:<port7> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip4>:<port8> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip5>:<port9> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip5>:<port10> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip6>:<port11> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip6>:<port12> with 10 cores, 124.9 GB RAM
dbg_et 2018-07-14 20:48:37.044785:  *****************before configuring sparkContext
dbg_et 2018-07-14 20:48:38.708370:  *****************after configuring sparkContext
dbg_et 2018-07-14 20:48:39.046295:  line before loop_a_lot. Number of partitions created=1, while number of executors is 1
dbg_et 2018-07-14 20:50:11.181091:  line after loop_a_lot

worker_dir中,Spark为运行创建了一个新目录,该目录包含12个子目录-仅其中一个(这次是目录5)具有脚本的副本和一个非空输出,其中这是有道理的,因为执行器的错误读取数1使Spark仅在一个分区中创建rdd.这是该工作程序的完整输出(该输出实际上是stderr-我不知道为什么它不应该出现在stdout中):

And in the worker_dir Spark made a new directory for the run, which has 12 subdirectories - only one of which (this time it was directory 5) has a copy of the script and a non-empty output which makes sense as the misread number of executors, 1, made Spark creating the rdd in one partition only. Here is the full output of that worker (this output is actually the stderr - I have no idea why it's not in the stdout as it should be):

dbg_et 2018-07-14 20:48:41.806805:  started working on item 1 at 2018-07-14 20:48:41.806733
dbg_et 2018-07-14 20:48:59.710258:  --------------------finished working on item 1 at 2018-07-14 20:48:59.710198
with a result: inf
dbg_et 2018-07-14 20:48:59.710330:  started working on item 2 at 2018-07-14 20:48:59.710315
dbg_et 2018-07-14 20:49:17.367545:  --------------------finished working on item 2 at 2018-07-14 20:49:17.367483
with a result: inf
dbg_et 2018-07-14 20:49:17.367613:  started working on item 3 at 2018-07-14 20:49:17.367592
dbg_et 2018-07-14 20:49:35.382441:  --------------------finished working on item 3 at 2018-07-14 20:49:35.381597
with a result: inf
dbg_et 2018-07-14 20:49:35.382517:  started working on item 4 at 2018-07-14 20:49:35.382501
dbg_et 2018-07-14 20:49:53.227696:  --------------------finished working on item 4 at 2018-07-14 20:49:53.227615
with a result: inf
dbg_et 2018-07-14 20:49:53.227771:  started working on item 5 at 2018-07-14 20:49:53.227755
dbg_et 2018-07-14 20:50:11.128510:  --------------------finished working on item 5 at 2018-07-14 20:50:11.128452
with a result: inf

有人可以帮助我了解导致问题的原因吗?任何的想法?可能是因为Slurm吗? (如您所见,我grep驱动程序的输出文件的方式-我在Slurm上运行Spark,因为我可以访问的集群由它管理)

Can someone help me understand what causes the problem? Any idea? Might it be because of Slurm? (as you can see by the way I greped the driver's output file - I am running Spark on top of Slurm as the cluster to which I have access is managed by it)

推荐答案

简短修复:在使用defaultParallelism_jsc.sc().getExecutorMemoryStatus()之前,请先等待一段时间(例如,添加sleep命令).在应用程序执行开始时使用.

Short fix: Allow time (e.g. add a sleep command) before you use defaultParallelism or _jsc.sc().getExecutorMemoryStatus() if you use either at the beginning of the application's execution.

说明: 似乎只有一个执行程序在启动时有很短的时间(我相信单个执行程序是驱动程序,在某些情况下,它被视为执行程序).这就是为什么在主函数顶部使用sc._jsc.sc().getExecutorMemoryStatus()对我产生错误的数字的原因. defaultParallelism(1)也是如此.

Explanation: There seems to be a short period of time at startup when there is only one executor (I believe that the single executor is the driver, which in some contexts is considered as an executor). That's why using sc._jsc.sc().getExecutorMemoryStatus() at the top of the main function yielded the wrong number for me. The same happened with defaultParallelism(1).

我的怀疑是,在所有工作人员都连接到该驱动程序之前,该驱动程序开始以自身作为工作人员使用.同意使用--total-executor-cores 12

My suspicion is that the driver starts working using itself as a worker before having all the workers connecting to it. It agrees with the fact that submitting the below code to spark-submit using --total-executor-cores 12

import time

conf = SparkConf().setAppName("app_name")
sc = SparkContext(conf=conf)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

log.warn('defaultParallelism={0}, and size of executorMemoryStatus={1}'.format(sc.defaultParallelism,
           sc._jsc.sc().getExecutorMemoryStatus().size()))
time.sleep(15)
log.warn('After 15 seconds: defaultParallelism={0}, and size of executorMemoryStatus={1}'
          .format(sc.defaultParallelism, 
                  sc._jsc.sc().getExecutorMemoryStatus().size()))
rdd_collected = (sc.parallelize([1, 2, 3, 4, 5] * 200, 
spark_context_holder.getParallelismAlternative()*3)
             .map(lambda x: (x, x*x) * 2)
             .map(lambda x: x[2] + x[1])
             )
log.warn('Made rdd with {0} partitioned. About to collect.'
          .format(rdd_collected.getNumPartitions()))
rdd_collected.collect()
log.warn('And after rdd operations: defaultParallelism={0}, and size of executorMemoryStatus={1}'
          .format(sc.defaultParallelism,
                  sc._jsc.sc().getExecutorMemoryStatus().size()))

给我下面的输出

> tail -n 4 slurm-<job number>.out
18/09/26 13:23:52 WARN dbg_et: defaultParallelism=2, and size of executorMemoryStatus=1
18/09/26 13:24:07 WARN dbg_et: After 15 seconds: defaultParallelism=12, and size of executorMemoryStatus=13
18/09/26 13:24:07 WARN dbg_et: Made rdd with 36 partitioned. About to collect.
18/09/26 13:24:11 WARN dbg_et: And after rdd operations: defaultParallelism=12, and size of executorMemoryStatus=13

以及检查创建工作目录的时间,我看到这是在记录了defaultParallelismgetExecutorMemoryStatus().size()的正确值之后(2).重要的是,这段时间是在记录这两个参数的错误值之后相当长的时间(〜10秒)(请参见上面带有"defaultParallelism=2"的行的时间与这些目录的创建时间)下面)

and that checking time at which the worker directories were created, I saw it was just after the correct values to both defaultParallelism and getExecutorMemoryStatus().size() were recorded(2). The important thing is that this time was quite a long time (~10 seconds) after the recording of the wrong values for these two parameters (see the time of the line with "defaultParallelism=2" above vs the time of these directories' creation below)

 > ls -l --time-style=full-iso spark/worker_dir/app-20180926132351-0000/
 <permission user blah> 2018-09-26 13:24:08.909960000 +0300 0/
 <permission user blah> 2018-09-26 13:24:08.665098000 +0300 1/
 <permission user blah> 2018-09-26 13:24:08.912871000 +0300 10/
 <permission user blah> 2018-09-26 13:24:08.769355000 +0300 11/
 <permission user blah> 2018-09-26 13:24:08.931957000 +0300 2/
 <permission user blah> 2018-09-26 13:24:09.019684000 +0300 3/
 <permission user blah> 2018-09-26 13:24:09.138645000 +0300 4/
 <permission user blah> 2018-09-26 13:24:08.757164000 +0300 5/
 <permission user blah> 2018-09-26 13:24:08.996918000 +0300 6/
 <permission user blah> 2018-09-26 13:24:08.640369000 +0300 7/
 <permission user blah> 2018-09-26 13:24:08.846769000 +0300 8/
 <permission user blah> 2018-09-26 13:24:09.152162000 +0300 9/

(1)在开始使用getExecutorMemoryStatus()之前,我尝试按您的方式尝试使用defaultParallelism,但是它始终为我提供数字2.现在,我知道这是出于相同的原因.在独立群集上运行,如果驱动程序仅看到1个执行程序,则defaultParallelism = 2可以在文档.

(1) Before starting to use getExecutorMemoryStatus() I tried using defaultParallelism, as you should, but it kept giving me the number 2. Now I understand this is from the same reason. Running on a standalone cluster, if the the driver sees only 1 executor then defaultParallelism = 2 as can be seen in the documentation for spark.default.parallelism.

(2)在创建目录之前,我不确定值的正确性如何-但我假设执行者的启动顺序是在创建目录之前将它们连接到驱动程序.

(2) I'm not sure how come the values are correct BEFORE the directories are created - but I'm assuming the executors' starting order has them connecting to the driver before creating the directories.

这篇关于getExecutorMemoryStatus().size()无法输出正确数量的执行程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆