星火多个上下文 [英] Spark multiple contexts

查看:207
本文介绍了星火多个上下文的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

总之

EC2集群:1主3奴

EC2 cluster: 1 master 3 slaves

星火版本:1.3.1

Spark version : 1.3.1

我想使用的选项 spark.driver.allowMultipleContexts ,一个上下文本地(主只),一个集群(主站和从站)。

I wish to use the option spark.driver.allowMultipleContexts, one context local (master only) and one cluster (master and slaves).

我得到这个堆栈跟踪误差(第29行是我调用初始化第二sparkcontext的对象):

I get this stacktrace error (line 29 is where I call the object that initialize the second sparkcontext) :

fr.entry.Main.main(Main.scala)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808)
   at scala.Option.foreach(Option.scala:236)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795)
   at scala.Option.foreach(Option.scala:236)
   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795)
   at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847)
   at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754)
   at fr.entry.cluster$.<init>(Main.scala:79)
   at fr.entry.cluster$.<clinit>(Main.scala)
   at fr.entry.Main$delayedInit$body.apply(Main.scala:29)
   at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
   at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
   at scala.App$$anonfun$main$1.apply(App.scala:71)
   at scala.App$$anonfun$main$1.apply(App.scala:71)
   at scala.collection.immutable.List.foreach(List.scala:318)
   at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
   at scala.App$class.main(App.scala:71)
   at fr.entry.Main$.main(Main.scala:14)
   at fr.entry.Main.main(Main.scala)
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-  20150928153330-0036/2 is now LOADING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-    20150928153330-0036/0 is now RUNNING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false)
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29)
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List()
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List()
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB)
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29)
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

更多细节:

我想运行一个程序,它做了两件事。首先我有一个地方sparkContext(在主只),我做一个RDD并做一些操作。其次我有第二个sparkContext与主和3个奴隶也使RDD并执行一些操作初始化。
因此,在第一种情况下我想使用的主机和第二种情况下的16个内核我想用8cores×3的奴隶。

I would like to run one program which does two things. Firstly I have a sparkContext local (on the master only), I make a RDD and do some operations. Secondly I have a second sparkContext initialize with a master and 3 slaves which also make a RDD and does some operations. So in the first case I want to use the 16 cores of the master and the second case I want to use the 8cores x 3 of the slaves.

简单的例子:

val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8))
println(local.sparkContext.makeRDD(arr).count()) 
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)

我的两个SparkContexts:

My two SparkContexts :

object local {

  val project = "test"
  val version = "1.0"

  val sc = new SparkConf()
    .setMaster("local[16]")
    .setAppName("Local")
    .set("spark.local.dir", "/mnt")
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar"))
    .setSparkHome("/root/spark")
    .set("spark.driver.allowMultipleContexts", "true")
    .set("spark.executor.memory", "45g")

  val sparkContext = new SparkContext(sc)
}

object cluster {

  val project = "test"
  val version = "1.0"

  val sc = new SparkConf()
   .setMaster(masterURL)  // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com
   .setAppName("Cluster")
   .set("spark.local.dir", "/mnt")
   .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars)
   .setSparkHome("/root/spark")
   .set("spark.driver.allowMultipleContexts", "true")
   .set("spark.executor.memory", "35g")

  val sparkContext = new SparkContext(sc)
}

我该如何解决这个问题?

How can I fix this?

推荐答案

虽然配置选项spark.driver.allowMultipleContexts存在的,因为多个星火上下文用法是鼓励它是一种误导。此选项仅用于火花内部测试,并且不应该在用户程序中使用。你可以同时在单个JVM运行多个星火背景意想不到的效果。

Although configuration option spark.driver.allowMultipleContexts exists, it is misleading because usage of multiple Spark contexts is discouraged. This option is used only for Spark internal tests and is not supposed to be used in user programs. You can get unexpected results while running more than one Spark context in a single JVM.

这篇关于星火多个上下文的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆