并行化/避免火花中的 foreach 循环 [英] Parallelize / avoid foreach loop in spark
问题描述
我编写了一个获取 DataFrame 的类,对其进行一些计算并可以导出结果.数据帧由键列表生成.我知道我现在正在以一种非常低效的方式这样做:
I wrote a class that gets a DataFrame, does some calculations on it and can export the results. The Dataframes are generated by a List of Keys. I know that i am doing this in a very unefficient way right now:
var l = List(34, 32, 132, 352) // Scala List
l.foreach{i =>
val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.setSettings(...)
x.calcSomething()
x.saveResults() // writes the Results into another Dataframe that is saved to HDFS
}
我认为 Scala 列表上的 foreach 不是并行的,那么我如何避免在这里使用 foreach?DataFrame 的计算可以并行进行,因为计算结果不是下一个 DataFrame 的输入 - 我该如何实现?
I think the foreach on the Scala list is not parallel, so how can i avoid using foreach here? The calculation the DataFrames could happen in parallel, as results of the calculations are NOT input for the next DataFrame - how can i implement this?
非常感谢!!
__
我尝试做的:
val l = List(34, 32, 132, 352) // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
DataContainer.getDataFrame(i)::l //append DataFrame to List of Dataframes
}
val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
val x = new MyClass(data)
)
但是给了
Invalid tree; null:
null
好吧,我不明白 evrything 是如何运作的......
edit 2: Okay, i don´t get how evrything works under the hood....
1) 当我在 spark-shell 中执行此操作时一切正常
1) Everything works fine when i execute this in spark-shell
spark-shell –driver-memory 10g
//...
var l = List(34, 32, 132, 352) // Scala List
l.foreach{i =>
val data:DataFrame = AllData.where($"a" === i) // get DataFrame
val x = new MyClass(data) // initialize MyClass with new Object
x.calcSomething()
}
2) 错误,当我用
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
// same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
3) 当我尝试并行化它时,我也得到一个错误
3) when i try to parallelize it, i get a error, too
spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)
实际上有10多个执行者,但有4个节点.我从不配置火花上下文.它已经在启动时给出.
There are actually more than 10 executors, but 4 nodes. I never configure the spark-context. It´s already given on startup.
推荐答案
你可以使用 scala 的 parallel collections 在驱动端实现foreach
并行.
You can use scala's parallel collections to achieve foreach
parallelism on the driver side.
val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}
*但是,请注意:您的集群是否能够并行运行作业?您可以将作业并行提交到 Spark 集群,但它们最终可能会在集群上排队并按顺序执行.
*However, a word of caution: is your cluster capable of running jobs parallely? You may submit the jobs to your spark cluster parallely but they may end up getting queued on the cluster and get executed sequentially.
这篇关于并行化/避免火花中的 foreach 循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!