并行化/避免火花中的 foreach 循环 [英] Parallelize / avoid foreach loop in spark

查看:22
本文介绍了并行化/避免火花中的 foreach 循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个获取 DataFrame 的类,对其进行一些计算并可以导出结果.数据帧由键列表生成.我知道我现在正在以一种非常低效的方式这样做:

I wrote a class that gets a DataFrame, does some calculations on it and can export the results. The Dataframes are generated by a List of Keys. I know that i am doing this in a very unefficient way right now:

var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()                               // writes the Results into another Dataframe that is saved to HDFS
}

我认为 Scala 列表上的 foreach 不是并行的,那么我如何避免在这里使用 foreach?DataFrame 的计算可以并行进行,因为计算结果不是下一个 DataFrame 的输入 - 我该如何实现?

I think the foreach on the Scala list is not parallel, so how can i avoid using foreach here? The calculation the DataFrames could happen in parallel, as results of the calculations are NOT input for the next DataFrame - how can i implement this?

非常感谢!!

__

我尝试做的:

val l = List(34, 32, 132, 352)      // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
    DataContainer.getDataFrame(i)::l        //append DataFrame to List of Dataframes
}

val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
    val x = new MyClass(data)
)

但是给了

Invalid tree; null:
null

好吧,我不明白 evrything 是如何运作的......

edit 2: Okay, i don´t get how evrything works under the hood....

1) 当我在 spark-shell 中执行此操作时一切正常

1) Everything works fine when i execute this in spark-shell

spark-shell –driver-memory 10g       
//...
var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass     with new Object
    x.calcSomething()
}

2) 错误,当我用

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g  
// same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3) 当我尝试并行化它时,我也得到一个错误

3) when i try to parallelize it, i get a error, too

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
   org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)

实际上有10多个执行者,但有4个节点.我从不配置火花上下文.它已经在启动时给出.

There are actually more than 10 executors, but 4 nodes. I never configure the spark-context. It´s already given on startup.

推荐答案

你可以使用 scala 的 parallel collections 在驱动端实现foreach并行.

You can use scala's parallel collections to achieve foreach parallelism on the driver side.

val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}

*但是,请注意:您的集群是否能够并行运行作业?您可以将作业并行提交到 Spark 集群,但它们最终可能会在集群上排队并按顺序执行.

*However, a word of caution: is your cluster capable of running jobs parallely? You may submit the jobs to your spark cluster parallely but they may end up getting queued on the cluster and get executed sequentially.

这篇关于并行化/避免火花中的 foreach 循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆