Pyspark应用程序仅部分利用Dataproc群集资源 [英] Pyspark application only partly exploits dataproc cluster resources

查看:64
本文介绍了Pyspark应用程序仅部分利用Dataproc群集资源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的pyspark应用程序在106,36 MB数据集(817.270条记录)上运行UDF,使用常规python lambda函数大约需要100个小时.我生成了一个Google Dataproc集群,其中包含20个工作节点,每个工作节点有8个vCPU.但是,执行时仅使用3个节点和3个vCPU.显然,我希望集群使用我提供的所有资源.

My pyspark application runs a UDF over a 106,36 MB dataset (817.270 records), which takes about 100hours with regular python lambda functions. I have spawned a Google Dataproc cluster with 20 worker nodes with 8 vCPU's each. However, upon execution only 3 nodes and 3 vCPU's in total are used. Obviously, I would like the cluster to use all the resources that I make available.

所得数据帧的默认分区数为8.我尝试将其重新分区为100,但是群集仅使用3个节点和3个vCPU.另外,当我运行命令以检查火花看到的执行程序的数量时,它只有3个.

The default number of partitions of my resulting dataframe is 8. I tried repartitioning it to 100 but the cluster keeps using only 3 nodes and 3 vCPU's. Also, when I run a command to check the number of executors that spark sees, it is only 3.

这是要执行的pyspark代码:

This is the pyspark code that gets executed:

from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf

customer_names = spark.createDataFrame(customer_names)

embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))



result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])

result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')


这是从我的jupyter笔记本上看到的一些火花输出

Here is some spark output as seen from my jupyter notebook

print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3

推荐答案

对于那些对我如何解决此问题感兴趣的人:

For those interested in how I solved the issue:

默认情况下,无论我在Google Cloud的Dataproc UI中产生了多少个额外的节点,我的Spark上下文都假定有两个工作节点.因此,我手动更改了Spark上下文,如下所示:

By default my spark context assumed two worker nodes, no matter how many extra nodes I spawned in Google Cloud's Dataproc UI. Therefore I manually changed the Spark Context as follows:

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf

    sc.stop()
    SparkContext.setSystemProperty('spark.executor.cores', '4')
    SparkContext.setSystemProperty('spark.executor.instances', '5')
    sc = SparkContext("yarn", "embargotest")
    spark = SparkSession.builder.appName('embargotest').getOrCreate()

此外,在将.withColumn函数应用于此数据框之前,我将customer_names数据集明确划分为20个(4个核心x 5个实例).

In addition, I explicitly partitioned the customer_names dataset to 20 (4 cores x 5 instances) before I apply the .withColumn function to this dataframe.

customer_names = spark.createDataFrame(customer_names).repartition(20)

希望这可以帮助遇到类似问题的人!

Hope this can help someone with a similar issue!

这篇关于Pyspark应用程序仅部分利用Dataproc群集资源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆