Pyspark应用程序仅部分利用Dataproc群集资源 [英] Pyspark application only partly exploits dataproc cluster resources
问题描述
我的pyspark应用程序在106,36 MB数据集(817.270条记录)上运行UDF,使用常规python lambda函数大约需要100个小时.我生成了一个Google Dataproc集群,其中包含20个工作节点,每个工作节点有8个vCPU.但是,执行时仅使用3个节点和3个vCPU.显然,我希望集群使用我提供的所有资源.
My pyspark application runs a UDF over a 106,36 MB dataset (817.270 records), which takes about 100hours with regular python lambda functions. I have spawned a Google Dataproc cluster with 20 worker nodes with 8 vCPU's each. However, upon execution only 3 nodes and 3 vCPU's in total are used. Obviously, I would like the cluster to use all the resources that I make available.
所得数据帧的默认分区数为8.我尝试将其重新分区为100,但是群集仅使用3个节点和3个vCPU.另外,当我运行命令以检查火花看到的执行程序的数量时,它只有3个.
The default number of partitions of my resulting dataframe is 8. I tried repartitioning it to 100 but the cluster keeps using only 3 nodes and 3 vCPU's. Also, when I run a command to check the number of executors that spark sees, it is only 3.
这是要执行的pyspark代码:
This is the pyspark code that gets executed:
from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf
customer_names = spark.createDataFrame(customer_names)
embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))
result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])
result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')
这是从我的jupyter笔记本上看到的一些火花输出
Here is some spark output as seen from my jupyter notebook
print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
推荐答案
对于那些对我如何解决此问题感兴趣的人:
For those interested in how I solved the issue:
默认情况下,无论我在Google Cloud的Dataproc UI中产生了多少个额外的节点,我的Spark上下文都假定有两个工作节点.因此,我手动更改了Spark上下文,如下所示:
By default my spark context assumed two worker nodes, no matter how many extra nodes I spawned in Google Cloud's Dataproc UI. Therefore I manually changed the Spark Context as follows:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf
sc.stop()
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.instances', '5')
sc = SparkContext("yarn", "embargotest")
spark = SparkSession.builder.appName('embargotest').getOrCreate()
此外,在将.withColumn函数应用于此数据框之前,我将customer_names数据集明确划分为20个(4个核心x 5个实例).
In addition, I explicitly partitioned the customer_names dataset to 20 (4 cores x 5 instances) before I apply the .withColumn function to this dataframe.
customer_names = spark.createDataFrame(customer_names).repartition(20)
希望这可以帮助遇到类似问题的人!
Hope this can help someone with a similar issue!
这篇关于Pyspark应用程序仅部分利用Dataproc群集资源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!