如何在使用 Spark 1.6 的集群上运行使用 Spark 2.1 组装的 Spark 应用程序? [英] How to run Spark application assembled with Spark 2.1 on cluster with Spark 1.6?

查看:33
本文介绍了如何在使用 Spark 1.6 的集群上运行使用 Spark 2.1 组装的 Spark 应用程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人告诉我,我可以用一个版本的 Spark 构建一个 Spark 应用程序,只要我使用 sbt assembly 来构建它,我就可以通过 spark-submit 来运行它任何火花簇.

I've been told that I could build a Spark application with one version of Spark and, as long as I use sbt assembly to build that, than I can run it with spark-submit on any spark cluster.

所以,我使用 Spark 2.1.1 构建了我的简单应用程序.你可以在下面看到我的 build.sbt 文件.比我在我的集​​群上开始这个:

So, I've build my simple application with Spark 2.1.1. You can see my build.sbt file below. Than I'm starting this on my cluster with:

cd spark-1.6.0-bin-hadoop2.6/bin/    
spark-submit --class  App --master local[*] /home/oracle/spark_test/db-synchronizer.jar

如您所见,我正在使用 spark 1.6.0 执行它.

So as you see I'm executing it with spark 1.6.0.

我收到错误:

17/06/08 06:59:20 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]
java.lang.NoSuchMethodError: org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String;Ljava/lang/String;)J
        at org.apache.spark.streaming.kafka010.KafkaRDD.<init>(KafkaRDD.scala:70)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:219)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/06/08 06:59:20 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:23 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:26 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:29 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        ... 1 more
17/06/08 06:59:39 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:42 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:45 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:48 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
        ... 1 more

根据一些阅读,我看到典型的错误:java.lang.NoSuchMethodError 连接到不同版本的 Spark.这可能是真的,因为我使用的是不同的.但是 sbt assembly 不应该涵盖吗?请看下面的 build.sbt 和 assembly.sbt 文件

Base on some reading I see that typically error: java.lang.NoSuchMethodError is connected to different versions of Spark. And that might be true because I'm useing different ones. But shouldn't sbt assembly cover that? please see below by build.sbt and assembly.sbt files

build.sbt

name := "spark-db-synchronizator"

//Versions
version := "1.0.0"
scalaVersion := "2.10.6"
val sparkVersion = "2.1.1"
val sl4jVersion = "1.7.10"
val log4jVersion = "1.2.17"
val scalaTestVersion = "2.2.6"
val scalaLoggingVersion = "3.5.0"
val sparkTestingBaseVersion = "1.6.1_0.3.3"
val jodaTimeVersion = "2.9.6"
val jodaConvertVersion = "1.8.1"
val jsonAssertVersion = "1.2.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.slf4j" % "slf4j-api" % sl4jVersion,
  "org.slf4j" % "slf4j-log4j12" % sl4jVersion exclude("log4j", "log4j"),
  "log4j" % "log4j" % log4jVersion % "provided",
  "org.joda" % "joda-convert" % jodaConvertVersion,
  "joda-time" % "joda-time" % jodaTimeVersion,
  "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
  "com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",
  "org.skyscreamer" % "jsonassert" % jsonAssertVersion % "test"
)

assemblyJarName in assembly := "db-synchronizer.jar"

run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

// Spark does not support parallel tests and requires JVM fork
parallelExecution in Test := false

fork in Test := true
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")

assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

推荐答案

您说得对,可以运行带有 some<上捆绑的 Spark 2.1.1 库的 Spark 应用程序/strong> Spark 1.6 环境,如 Hadoop YARN(在 CDH 或 HDP 中).

You're correct and it is possible to run a Spark application with Spark 2.1.1 libraries bundled on some Spark 1.6 environments like Hadoop YARN (in CDH or HDP).

该技巧在大型公司中经常使用,在这些公司中,基础架构团队强迫开发团队使用一些较旧的 Spark 版本,只是因为 CDH (YARN) 或 HDP (YARN) 不支持它们.

The trick is fairly often used in large corporations where the infrastructure team forces development teams to use some older Spark versions only because CDH (YARN) or HDP (YARN) do not support them.

您应该使用较新的 Spark 安装中的 spark-submit(我建议在撰写本文时使用最新最好的 2.1.1)并捆绑 all Sparkjars 作为 Spark 应用程序的一部分.

You should use spark-submit from the newer Spark installation (I'd suggest using the latest and greatest 2.1.1 as of this writing) and bundle all Spark jars as part of your Spark application.

只需 sbt assembly 使用 Spark 2.1.1(如您在 build.sbt 中指定的)和 spark-submit uberjar 的 Spark 应用程序使用与旧版 Spark 环境完全相同的 Spark 2.1.1 版本.

Just sbt assembly your Spark application with Spark 2.1.1 (as you specified in build.sbt) and spark-submit the uberjar using the very same version of Spark 2.1.1 to older Spark environments.

事实上,Hadoop YARN 并没有让 Spark 比任何其他应用程序库或框架更好.不太愿意特别关注Spark.

As a matter of fact, Hadoop YARN does not make Spark any better than any other application library or framework. It's quite reluctant to pay special attention to Spark.

然而,这需要一个集群环境(并且刚刚检查了当您的 Spark 应用程序使用 Spark 2.1.1 时它不能与 Spark Standalone 1.6 一起使用).

That however requires a cluster environment (and just checked it won't work with Spark Standalone 1.6 when your Spark application uses Spark 2.1.1).

就您而言,当您使用 local[*] 主 URL 启动 Spark 应用程序时,它应该工作.

In your case, when you started your Spark application using local[*] master URL, it was not supposed to work.

cd spark-1.6.0-bin-hadoop2.6/bin/    
spark-submit --class  App --master local[*] /home/oracle/spark_test/db-synchronizer.jar

有两个原因:

  1. local[*] 受 CLASSPATH 的限制,试图说服 Spark 1.6.0 在同一个 JVM 上运行 Spark 2.1.1 可能需要相当长的时间(如果可能的话)全部)

  1. local[*] is fairly constrained by CLASSPATH and trying to convince Spark 1.6.0 to run Spark 2.1.1 on the same JVM might take you fairly long time (if possible at all)

您使用旧版本来运行更新的 2.1.1.相反的可以工作.

You use older version to run more current 2.1.1. The opposite could work.

同样使用 Hadoop YARN...嗯...它不关注 Spark,并且已经在我的项目中进行了几次测试.

Use Hadoop YARN as...well...it does not pay attention to Spark and has been tested few times in my projects already.

我在徘徊,我怎么知道运行时采用了哪个版本的即spark-core

I was wandering how can I know which version of i.e.spark-core is taken in runtime

使用网络用户界面,您应该会在左上角看到版本.

Use web UI and you should see the version in your top-left corner.

您还应该查阅 Web UI 的环境标签,您可以在其中找到运行时环境的配置.这是有关您的 Spark 应用程序托管环境的最权威来源.

You should also consult web UI's Environment tab where you find the configuration of the runtime environment. That's the most authoritative source about the hosting environment of your Spark application.

在底部附近,您应该看到 Classpath Entries,它应该为您提供包含 jar、文件和类的 CLASSPATH.

Near the bottom you should see the Classpath Entries which should give you the CLASSPATH with jars, files and classes.

用它来查找任何与 CLASSPATH 相关的问题.

Use it to find any CLASSPATH-related issues.

这篇关于如何在使用 Spark 1.6 的集群上运行使用 Spark 2.1 组装的 Spark 应用程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆