并行/避免在火花中的foreach循环 [英] Parallelize / avoid foreach loop in spark

查看:221
本文介绍了并行/避免在火花中的foreach循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一个获得DataFrame的类,对它做了一些计算,并可以导出结果。数据框由键列表生成。我知道我现在正以非常有效的方式做这件事:

  var l = List(34,32,132, 352)// Scala List 

l.foreach {i =>
val data DataFrame = DataContainer.getDataFrame(i)//获取DataFrame
val x = new MyClass(data)//用新对象初始化MyClass
x.setSettings(...)
x.calcSomething()
x.saveResults()//将结果写入保存到HDFS的另一个Dataframe

我认为Scala列表中的foreach不是平行的,那么我怎样才能避免在这里使用foreach? DataFrames的计算可能会并行发生,因为计算的结果不是为下一个DataFrame输入的 - 我怎么能实现这个?



非常感谢!



__编辑:



我试过的:

  val l = List(34,32,132,352)// Scala List 
var l_DF:List [DataFrame] = List()
l。 foreach {i =>
DataContainer.getDataFrame(i):: l //将DataFrame附加到数据框列表
}

val rdd:DataFrame = sc.parallelize(l)
rdd .foreach(data =>
val x = new MyClass(data)

但给出

 树无效; null:
null

编辑2:
好​​的,我没有如何evrything工作在引擎盖下....
$ b $ 1)一切工作正常,当我在火花外壳执行此

  spark-shell -driver-memory 10g 
// ...
var l = List(34,32,132,352)// Scala List

l.foreach {i =>
val data:DataFrame = AllData.where($a=== i)//获取DataFrame
val x = new MyClass(data)//用新对象初始化MyClass
x .calcSomething()
}


$ b <2>错误,当我开始相同

  spark-shell --master yarn-client --num-executors 10 -driver-memory 10g 
//相同的代码如上
java.util.concurrent.RejectedExecutionException:任务scala.concurrent.impl.CallbackRunnable@7b600fed被拒绝从java.util.concurrent.ThreadPoolExecutor@1431127 [已终止,池大小= 0,活动线程= 0,排队的任务= 0,已完成的任务= 1263]
在java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)$ b $在java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java: 823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at scala.concurrent.impl.ExecutionConte xtImpl $$ anon $ 1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise $ DefaultPromise。 tryComplete(Promise.scala:248)
at scala.concurrent.Promise $ class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise $ DefaultPromise.complete(Promise.scala: (Future.scala:324)
at scala.concurrent.Future $$ anonfun $ recover $ 1.apply(Future.scala: 324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors $ SameThreadExecutorService.execute(MoreExecutors.java: 293)
at scala.concurrent.impl.ExecutionContextImpl $$ anon $ 1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise $ DefaultPromise.tryComplete(Pro (Promise.scala:55)
at scala.concurrent.impl.Promise $ DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Promise $ class.complete
at scala.concurrent.Future $$ anonfun $ map $ 1.apply(Future.scala:235)
at scala.concurrent.Future $$ anonfun $ map $ 1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3)当我尝试并行化,我也得到一个错误,也是

  spark-shell --master yarn-client  - num-executors 10 -driver-memory 10g 
// ...
var l = List(34,32,132,352).par
//与上面的代码相同,只是并行在调用foreach之前
//我可以看到控制台消息的并行执行(我的类提供了一些,他们现在并行打印出来,而不是serousielly

scala.collection.parallel.CompositeThrowable:并行计算期间抛出多个异常:java.lang.IllegalStateException:Spar kContext已经关闭
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org。 apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
org.apache.spark.sql。执行.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame $$ anonfun $ collect $ 1.apply(DataFrame.scala:1385)
org.apache.spark。 sql.DataFrame $$ anonfun $ collect $ 1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:56)
org.apache .spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)




java.lang.IllegalStateException:无法调用已停止的SparkContext上的方法org.apache.spark.SparkContext.org $ apache $ spark $ SparkContext $$ assertNotStopped(SparkContext.scala:104)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
org.apache.spark.sql.execution.datasources.DataSourceStrategy $ .apply(DataSourceStrategy.scala:104)
org.apache .spark.sql.catalyst.planning.QueryPlanner $$ anonfun $ 1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner $$ anonfun $ 1.apply(QueryPlanner.scala :58)
scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59 )
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies $ EquiJoinSelection $ .makeBroadcastHashJoin(SparkStrategies。 scala:92)
org.apache.spark.sql.execution.SparkStrategies $ EquiJ oinSelection $ .apply(SparkStrategies.scala:104)

实际上有超过10个执行者,但是4个节点。我从来没有配置spark-context。它已经在启动时提供。

解决方案

您可以使用scala的平行集合来实现 foreach 并行性on

  val l =列表(34,32,132,352).par 
l.foreach {i => //你的代码是并行运行的,每一个我都是这样的}

*但是,你的集群能够并行运行吗?你可以将这些作业同时提交给你的spark集群,但是最终可能会在集群上排队,然后依次执行。

I wrote a class that gets a DataFrame, does some calculations on it and can export the results. The Dataframes are generated by a List of Keys. I know that i am doing this in a very unefficient way right now:

var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()                               // writes the Results into another Dataframe that is saved to HDFS
}

I think the foreach on the Scala list is not parallel, so how can i avoid using foreach here? The calculation the DataFrames could happen in parallel, as results of the calculations are NOT input for the next DataFrame - how can i implement this?

Thank you so much!!

__edit:

what i tried to do:

val l = List(34, 32, 132, 352)      // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
    DataContainer.getDataFrame(i)::l        //append DataFrame to List of Dataframes
}

val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
    val x = new MyClass(data)
)

but gives

Invalid tree; null:
null

edit 2: Okay, i don´t get how evrything works under the hood....

1) Everything works fine when i execute this in spark-shell

spark-shell –driver-memory 10g       
//...
var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass     with new Object
    x.calcSomething()
}

2) Error, when i start the same with

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g  
// same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3) when i try to parallelize it, i get a error, too

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
   org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)

There are actually more than 10 executors, but 4 nodes. I never configure the spark-context. It´s already given on startup.

解决方案

You can use scala's parallel collections to achieve foreach parallelism on the driver side.

val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}

*However, a word of caution: is your cluster capable of running jobs parallely? You may submit the jobs to your spark cluster parallely but they may end up getting queued on the cluster and get executed sequentially.

这篇关于并行/避免在火花中的foreach循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆