为什么每个Spark任务都没有利用所有分配的内核? [英] Why is each Spark Task not utilizing all allocated cores?

查看:119
本文介绍了为什么每个Spark任务都没有利用所有分配的内核?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我每个执行器有36个内核,每个节点一个执行器,以及3个节点,每个节点有48个可用内核.我注意到的基本要点是,当我将每个任务设置为使用1个内核(默认值)时,我在工作程序上的CPU利用率约为70%,每个执行程序将同时执行36个任务(这与我预期的一样).但是,当我将配置更改为每个任务有6个内核(-conf spark.task.cpus = 6 )时,每个执行器一次就会降到6个任务(如预期的那样),但是我的CPU使用率也下降到10%以下(意外).我本以为Spark会知道如何在6个内核上并行化工作负载.

Assume I have 36 cores per executor, one executor per node, and 3 nodes each with 48 cores available. The basic gist of what I've noticed is, when I set each task to use 1 core (the default), my CPU utilization over the workers is about 70% and 36 tasks will execute simultaneously per executor (as I would have expected). However, when I change my configuration to have 6 cores per task (--conf spark.task.cpus=6), I get the drop to 6 tasks at a time per executor (as expected), but my CPU utilization also drops below 10% utilization (unexpected). I would have assumed that Spark would know how to parallelize the workload over the 6 cores.

重要的实现细节是,我正在 DataFrame 的列上运行UDF函数,并将结果作为新列附加到该数据框上.此UDF函数使用 @transient 对象,该对象提供了我正在使用的机器学习算法.此UDF函数不是聚合或合并操作的一部分,它只是对实现的列的 map 操作,如下所示:

The implementation details that are important are that I am running a UDF function on a column of a DataFrame and appending the results as a new column on that dataframe. This UDF function uses a @transient object that provides a machine learning algorithm that I'm using. This UDF function is not part of an aggregation or coalesce operation, it is just a map operation over the column implemented like so:

def myUdf = udf { ... }

val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)

我希望Spark可以一次执行6个 myUdf 来一次处理6条记录,每个核一个,但是事实并非如此.有没有办法解决此问题(无需向Spark项目提交PR),或者至少有人可以解释为什么会发生这种情况?

I would have expected that Spark would execute 6 myUdf to process 6 records at a time, one for each core, but this doesn't appear to be the case. Is there a way to fix this (without submitting a PR to the Spark project), or at least, can someone explain why this might be happening?

预见到这个问题,我正在尝试增加每个任务的内核数量,以减少每个执行者所需的RAM数量.在这种情况下,一次执行太多任务会成倍增加RAM使用率.

Anticipating the question, I'm experimenting with increasing the number of cores per task in order to reduce the amount of RAM required per executor. Executing too many tasks at once exponentially increases the RAM usage, in this instance.

推荐答案

spark.task.cpus 为每个任务分配的内核数量.如果用户代码是多线程的,则用于将多个内核分配给单个任务.如果您的 udf 不使用多个(在单个函数调用中不产生多个线程)线程,那么这些内核就被浪费了.

spark.task.cpus is a number of cores to allocate for each task. It is used to allocate multiple cores to a single task, in case when user code is multi-threaded. If your udf doesn't use multiple (doesn't spawn multiple threads in a single function call) threads then the cores are just wasted.

一次处理6条记录

to process 6 records at a time

分配6个核心,并将 spark.task.cpus 设置为1.如果要限制节点上的任务数,请减少每个节点提供的核心数.

allocate 6 cores, with spark.task.cpus set to 1. If you want to limit number of tasks on node, then reduce number of cores offered by each node.

基本上,Spark可以通过在每个任务之间(根据分区)将记录拆分并确定每个执行者可以处理多少个并行任务,来自行确定如何将UDF并发映射到多个记录上.但是,Spark无法自动将每个任务的每个核心拆分工作.为了在每个任务中使用多个内核,需要编写UDF中的代码(每个任务一次(依次)在一条记录上执行),以并行化一条记录中该UDF中的计算.

Essentially Spark can determine on its own how to split out mapping a UDF over multiple records concurrently by splitting the records up among each of the Tasks (according to the partitioning) and determining how many simultaneous Tasks each Executor can handle. However, Spark can NOT automatically split the work per Core per Task. To utilize multiple cores per task, the code in the UDF, which would get executed over one record at a time (sequentially) per Task, would need to be written to parallelize the computation in that UDF over a single record.

这篇关于为什么每个Spark任务都没有利用所有分配的内核?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆