当有更多可用机器时,Spark仅使用一台工作计算机 [英] Spark is only using one worker machine when more are available

查看:185
本文介绍了当有更多可用机器时,Spark仅使用一台工作计算机的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过Spark并行执行机器学习预测任务.我以前在其他任务上成功使用过Spark多次,并且以前没有遇到并行化的问题.

I'm trying to parallelize a machine learning prediction task via Spark. I've used Spark successfully a number of times before on other tasks and have faced no issues with parallelization before.

在此特定任务中,我的集群有4个工人.我在具有4个分区的RDD上调用mapPartitions. map函数从磁盘加载模型(引导脚本会分发执行此操作所需的全部;我已经确认它存在于每台从属计算机上),并对RDD分区中的数据点执行预测.

In this particular task, my cluster has 4 workers. I'm calling mapPartitions on an RDD with 4 partitions. The map function loads a model from disk (a bootstrap script distributes all that is needed to do this; I've verified it exists on each slave machine) and performs prediction on data points in the RDD partition.

该代码运行,但仅使用一个执行程序.其他执行程序的日志说"Shutdown hook named".在不同的代码运行中,它使用不同的机器,但一次只能使用一台机器.

The code runs, but only utilizes one executor. The logs for the other executors say "Shutdown hook called". On different runs of the code, it uses different machines, but only one at a time.

如何让Spark一次使用多台计算机?

How can I get Spark to use multiple machines at once?

我正在通过Zeppelin笔记本在Amazon EMR上使用PySpark.代码段如下.

I'm using PySpark on Amazon EMR via Zeppelin notebook. Code snippets are below.

%spark.pyspark

sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")

from ModelLoader import ModelLoader
from MyClassifier import MyClassifier

def load_models():
    models_path = '/home/hadoop/models'
    model_loader = ModelLoader(models_path)

    models = model_loader.load_models()
    return models

def process_file(file_contents, models):
    filename = file_contents[0]
    filetext = file_contents[1]
    pred = MyClassifier.predict(filetext, models)
    return (filename, pred)

def process_partition(file_list):
    models = load_models()
    for file_contents in file_list:
        pred = process_file(file_contents, models)
        yield pred


all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')

有四个任务可以预期,但是它们都在同一执行程序上运行!

There are four tasks as expected, but they all run on the same executor!

我正在运行集群,并且可以提供资源管理器中可用的日志.我只是不知道在哪里看.

I have the cluster running and can provide logs as available in Resource Manager. I just don't know yet where to look.

推荐答案

该进程具有指定数量的分区,但它以序列化的方式进行.

The process has as many as partitions you specified but it is going in serialized way.

执行人

该过程可能会增加默认数量的执行程序.这可以在纱线资源管理器中看到.在您的情况下,所有处理均由一名执行者完成.如果执行者具有多个核心,则将使该工作相称.在emr中,您需要进行此更改,以使执行程序具有1个以上的核心.

The process might spin up default number of executors. This can be seen in the yarn resource manager. In your case all the processing is done by one executor. If executor has more than one core it will parellize the job. In emr you have do this changes in order to have more than 1 core for the executor.

在我们的例子中,具体发生的是,数据很小,因此所有数据都在一个执行程序(即使用一个节点)中读取.没有以下属性,执行程序仅使用单核.因此,所有任务都被序列化了.

What specifically happening in our case is, the data is small, so all the data is read in one executor(ie which is using one node). With out the following property the executor uses only single core. Hence all the tasks are serialized.

设置属性

sudo  vi /etc/hadoop/conf/capacity-scheduler.xml

设置如下所示的属性

"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalcul‌​ator"

为了使此属性适用,您必须重新启动纱线

In order to make this property applicable you have to restart the yarn

 sudo  hadoop-yarn-resourcemanager stop

重新启动纱线

 sudo  hadoop-yarn-resourcemanager start 

提交工作后,请查看纱线和spark-ui

When your job is submitted see the yarn and the spark-ui

在Yarn中,您将看到执行者的更多核心

In Yarn you will see more cores for executor

这篇关于当有更多可用机器时,Spark仅使用一台工作计算机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆