如何在带有Spark 1.6的集群上运行与Spark 2.1组装在一起的Spark应用程序? [英] How to run Spark application assembled with Spark 2.1 on cluster with Spark 1.6?

查看:83
本文介绍了如何在带有Spark 1.6的集群上运行与Spark 2.1组装在一起的Spark应用程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人告诉我可以使用一个版本的Spark来构建Spark应用程序,并且只要我使用 sbt程序集进行构建,我就可以通过spark-submit运行它任何火花簇.

I've been told that I could build a Spark application with one version of Spark and, as long as I use sbt assembly to build that, than I can run it with spark-submit on any spark cluster.

因此,我已经用Spark 2.1.1构建了我的简单应用程序.您可以在下面看到我的build.sbt文件.比起我在集群上用以下方法开始:

So, I've build my simple application with Spark 2.1.1. You can see my build.sbt file below. Than I'm starting this on my cluster with:

cd spark-1.6.0-bin-hadoop2.6/bin/    
spark-submit --class  App --master local[*] /home/oracle/spark_test/db-synchronizer.jar

因此,如您所见,我正在使用spark 1.6.0执行它.

So as you see I'm executing it with spark 1.6.0.

我收到错误消息:

17/06/08 06:59:20 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]
java.lang.NoSuchMethodError: org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String;Ljava/lang/String;)J
        at org.apache.spark.streaming.kafka010.KafkaRDD.<init>(KafkaRDD.scala:70)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:219)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/06/08 06:59:20 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:23 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:26 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:29 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        ... 1 more
17/06/08 06:59:39 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:42 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:45 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:48 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        ... 1 more

基于一些阅读,我看到通常是错误的: java.lang.NoSuchMethodError 连接到不同版本的Spark.那可能是正确的,因为我使用的是不同的.但是, sbt汇编是否不应该涵盖这一点?请按以下build.sbt和assembly.sbt文件查看

Base on some reading I see that typically error: java.lang.NoSuchMethodError is connected to different versions of Spark. And that might be true because I'm useing different ones. But shouldn't sbt assembly cover that? please see below by build.sbt and assembly.sbt files

build.sbt

name := "spark-db-synchronizator"

//Versions
version := "1.0.0"
scalaVersion := "2.10.6"
val sparkVersion = "2.1.1"
val sl4jVersion = "1.7.10"
val log4jVersion = "1.2.17"
val scalaTestVersion = "2.2.6"
val scalaLoggingVersion = "3.5.0"
val sparkTestingBaseVersion = "1.6.1_0.3.3"
val jodaTimeVersion = "2.9.6"
val jodaConvertVersion = "1.8.1"
val jsonAssertVersion = "1.2.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.slf4j" % "slf4j-api" % sl4jVersion,
  "org.slf4j" % "slf4j-log4j12" % sl4jVersion exclude("log4j", "log4j"),
  "log4j" % "log4j" % log4jVersion % "provided",
  "org.joda" % "joda-convert" % jodaConvertVersion,
  "joda-time" % "joda-time" % jodaTimeVersion,
  "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
  "com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",
  "org.skyscreamer" % "jsonassert" % jsonAssertVersion % "test"
)

assemblyJarName in assembly := "db-synchronizer.jar"

run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

// Spark does not support parallel tests and requires JVM fork
parallelExecution in Test := false

fork in Test := true
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")

assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

推荐答案

您是正确的,并且可以运行带有捆绑在 some Spark 1.6环境,例如Hadoop YARN(在CDH或HDP中).

You're correct and it is possible to run a Spark application with Spark 2.1.1 libraries bundled on some Spark 1.6 environments like Hadoop YARN (in CDH or HDP).

这种技巧通常在大型企业中使用,因为基础架构团队迫使开发团队仅使用CDH(YARN)或HDP(YARN)来支持某些较旧的Spark版本.

The trick is fairly often used in large corporations where the infrastructure team forces development teams to use some older Spark versions only because CDH (YARN) or HDP (YARN) do not support them.

您应使用较新的Spark安装中的 spark-submit (我建议您使用撰写本文时最新的最新版本2.1.1)并捆绑 all Spark罐子作为Spark应用程序的一部分.

You should use spark-submit from the newer Spark installation (I'd suggest using the latest and greatest 2.1.1 as of this writing) and bundle all Spark jars as part of your Spark application.

只需将 sbt程序集与Spark 2.1.1(如在 build.sbt 中指定的)和 spark-submit uberjar中的Spark应用程序一起使用使用与旧版Spark环境完全相同的Spark 2.1.1版本.

Just sbt assembly your Spark application with Spark 2.1.1 (as you specified in build.sbt) and spark-submit the uberjar using the very same version of Spark 2.1.1 to older Spark environments.

事实上,Hadoop YARN并没有使Spark比其他任何应用程序库或框架更好.不太愿意特别注意Spark.

As a matter of fact, Hadoop YARN does not make Spark any better than any other application library or framework. It's quite reluctant to pay special attention to Spark.

但是,这需要一个集群环境(并且刚刚检查它在您的Spark应用程序使用Spark 2.1.1时不能与Spark Standalone 1.6一起使用.

That however requires a cluster environment (and just checked it won't work with Spark Standalone 1.6 when your Spark application uses Spark 2.1.1).

以您为例,当您使用 local [*] 主URL启动Spark应用程序时,应该起作用.

In your case, when you started your Spark application using local[*] master URL, it was not supposed to work.

cd spark-1.6.0-bin-hadoop2.6/bin/    
spark-submit --class  App --master local[*] /home/oracle/spark_test/db-synchronizer.jar

有两个原因:

  1. local [*] 受到CLASSPATH的严格限制,尝试说服Spark 1.6.0在同一JVM上运行Spark 2.1.1可能会花费您相当长的时间(如果可能的话)全部)

  1. local[*] is fairly constrained by CLASSPATH and trying to convince Spark 1.6.0 to run Spark 2.1.1 on the same JVM might take you fairly long time (if possible at all)

您使用较旧的版本来运行更多最新的2.1.1.相反的可以起作用.

You use older version to run more current 2.1.1. The opposite could work.

将Hadoop YARN用作...很好...它不关注Spark,并且已经在我的项目中进行了几次测试.

Use Hadoop YARN as...well...it does not pay attention to Spark and has been tested few times in my projects already.

我在徘徊如何知道运行时使用了哪个版本的spark-core

I was wandering how can I know which version of i.e.spark-core is taken in runtime

使用Web UI,您应该在左上角看到该版本.

Use web UI and you should see the version in your top-left corner.

您还应该查阅Web UI的 Environment (环境)选项卡,在其中可以找到运行时环境的配置.那是有关您的Spark应用程序托管环境的最权威来源.

You should also consult web UI's Environment tab where you find the configuration of the runtime environment. That's the most authoritative source about the hosting environment of your Spark application.

在底部附近,您应该会看到 Classpath Entries (类路径条目),该类应该为您提供带有jar,文件和类的CLASSPATH.

Near the bottom you should see the Classpath Entries which should give you the CLASSPATH with jars, files and classes.

使用它来查找任何与CLASSPATH相关的问题.

Use it to find any CLASSPATH-related issues.

这篇关于如何在带有Spark 1.6的集群上运行与Spark 2.1组装在一起的Spark应用程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆