受Apache Spark限制吗? [英] is CPU usage in Apache Spark limited?

查看:108
本文介绍了受Apache Spark限制吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近发现,即使在local[1]模式下运行spark或使用具有1个执行程序和1个内核的Yarn,在UDF中添加并行计算(例如,使用并行集合)也可以显着提高性能.

I recently discovered that adding parallel computing (e.g. using parallel-collections) inside UDFs increases performance considerable even when running spark in local[1] mode or using Yarn with 1 executor and 1 core.

例如在local[1]模式下,Spark-Job消耗尽可能多的CPU(即,如果我有8个内核(使用top测量),则为800%).

E.g. in local[1] mode, the Spark-Jobs consumes as much CPU as possible (i.e. 800% if I have 8 cores, measured using top).

这似乎很奇怪,因为我认为Spark(或yarn)限制了每个Spark应用程序的CPU使用率?

This seems strange because I thought Spark (or yarn) limits the CPU usage per Spark application?

所以我想知道为什么会这样,是否建议在spark中使用并行处理/多线程,还是应该坚持使用spark并行化模式?

So I wonder why that is and whether it's recommended to use parallel-processing/mutli-threading in spark or should I stick to sparks parallelizing pattern?

这里是一个示例(以1个实例和1个内核的纱线客户端模式测量的时间)

Here an example to play with (times measured in yarn client-mode with 1 instance and 1 core)

case class MyRow(id:Int,data:Seq[Double])

// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()

df.show() // trigger computation and caching

// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d

val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)

df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds

推荐答案

Spark并不直接限制CPU,而是定义spark创建的并发线程数.因此,对于local [1],基本上可以同时并行执行一个任务.当您在in.par.map {expensive}中执行操作时,您正在创建无法管理spark的线程,因此不受此限制的处理.也就是说,您告诉spark将自己限制为一个线程,然后创建了其他线程,而spark不知道该线程.

Spark does not limit CPU directly, instead it defines the number of concurrent threads spark creates. So for local[1] it would basically run one task at a time in parallel. When you are doing in.par.map{expensive} you are creating threads which spark does not manage and therefore are not handled by this limit. i.e. you told spark to limit itself to a single thread and then created other threads without spark knowing it.

通常,在spark操作内部执行并行线程不是一个好主意.相反,最好告诉spark它可以处理多少个线程,并确保有足够的分区用于并行处理.

In general, it is not a good idea to do parallel threads inside of a spark operation. Instead, it would be better to tell spark how many threads it can work with and make sure you have enough partitions for parallelism.

这篇关于受Apache Spark限制吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆