为什么每个 Spark 任务都没有利用所有分配的内核? [英] Why is each Spark Task not utilizing all allocated cores?

查看:34
本文介绍了为什么每个 Spark 任务都没有利用所有分配的内核?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我每个 executor 有 36 个内核,每个节点有一个 executor,以及 3 个节点,每个节点有 48 个内核可用.我注意到的基本要点是,当我将每个任务设置为使用 1 个核心(默认值)时,我的 CPU 利用率约为 70%,每个执行程序将同时执行 36 个任务(正如我所期望的).但是,当我将配置更改为每个任务有 6 个内核 (--conf spark.task.cpus=6) 时,每个执行程序一次减少到 6 个任务(如预期),但我的 CPU 利用率也下降到 10% 以下(出乎意料).我会假设 Spark 知道如何在 6 个内核上并行化工作负载.

Assume I have 36 cores per executor, one executor per node, and 3 nodes each with 48 cores available. The basic gist of what I've noticed is, when I set each task to use 1 core (the default), my CPU utilization over the workers is about 70% and 36 tasks will execute simultaneously per executor (as I would have expected). However, when I change my configuration to have 6 cores per task (--conf spark.task.cpus=6), I get the drop to 6 tasks at a time per executor (as expected), but my CPU utilization also drops below 10% utilization (unexpected). I would have assumed that Spark would know how to parallelize the workload over the 6 cores.

重要的实现细节是我在 DataFrame 的列上运行 UDF 函数并将结果作为该数据帧的新列附加.此 UDF 函数使用一个 @transient 对象,该对象提供了我正在使用的机器学习算法.此 UDF 函数不是聚合或合并操作的一部分,它只是对列执行的 map 操作,如下所示:

The implementation details that are important are that I am running a UDF function on a column of a DataFrame and appending the results as a new column on that dataframe. This UDF function uses a @transient object that provides a machine learning algorithm that I'm using. This UDF function is not part of an aggregation or coalesce operation, it is just a map operation over the column implemented like so:

def myUdf = udf { ... }

val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)

我原以为 Spark 会执行 6 个 myUdf 以一次处理 6 个记录,每个核心一个,但事实并非如此.有没有办法解决这个问题(无需向 Spark 项目提交 PR),或者至少有人可以解释为什么会发生这种情况?

I would have expected that Spark would execute 6 myUdf to process 6 records at a time, one for each core, but this doesn't appear to be the case. Is there a way to fix this (without submitting a PR to the Spark project), or at least, can someone explain why this might be happening?

预料到这个问题,我正在尝试增加每个任务的内核数量,以减少每个执行程序所需的 RAM 量.在这种情况下,一次执行太多任务会成倍增加 RAM 使用量.

Anticipating the question, I'm experimenting with increasing the number of cores per task in order to reduce the amount of RAM required per executor. Executing too many tasks at once exponentially increases the RAM usage, in this instance.

推荐答案

spark.task.cpus为每个任务分配的内核数量.它用于为单个任务分配多个内核,以防用户代码是多线程的.如果您的 udf 不使用多个(不会在单个函数调用中产生多个线程)线程,那么内核就被浪费了.

spark.task.cpus is a number of cores to allocate for each task. It is used to allocate multiple cores to a single task, in case when user code is multi-threaded. If your udf doesn't use multiple (doesn't spawn multiple threads in a single function call) threads then the cores are just wasted.

一次处理6条记录

分配 6 个内核,spark.task.cpus 设置为 1.如果要限制节点上的任务数,请减少每个节点提供的内核数.

allocate 6 cores, with spark.task.cpus set to 1. If you want to limit number of tasks on node, then reduce number of cores offered by each node.

本质上,Spark 可以通过在每个 Tasks 之间拆分记录(根据分区)并确定每个 Executor 可以处理多少个并发 Tasks 来自行确定如何在多个记录上同时拆分映射 UDF.但是,Spark 不能自动拆分每个任务每个核心的工作.要在每个任务中使用多个内核,需要编写 UDF 中的代码(每个任务一次(顺序)在一个记录上执行),以便在单个记录上并行化该 UDF 中的计算.

Essentially Spark can determine on its own how to split out mapping a UDF over multiple records concurrently by splitting the records up among each of the Tasks (according to the partitioning) and determining how many simultaneous Tasks each Executor can handle. However, Spark can NOT automatically split the work per Core per Task. To utilize multiple cores per task, the code in the UDF, which would get executed over one record at a time (sequentially) per Task, would need to be written to parallelize the computation in that UDF over a single record.

这篇关于为什么每个 Spark 任务都没有利用所有分配的内核?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆