作业提交时的Scala fat jar依赖问题 [英] Scala fat jar dependency issue while Job submit

查看:16
本文介绍了作业提交时的Scala fat jar依赖问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Scala 编写了简单的 kafka 流.它在当地运作良好.我已经拿了肥罐并在 scala 集群中提交.提交作业后,我收到类未找到错误.如果我提取 fat jar,它在 fat jar 中具有所有依赖项.

I have written simple kafka stream using Scala. It is working good in local. I have taken fat jar and submitted in scala cluster. I am getting class not found error after submit the job. if I extract the fat jar, it has all dependency inside the fat jar.

为什么我得到 class not found 错误?如何解决?

why I am getting class not found error ?. How to solve this ?

注意:如果我手动将 fat jar 部署(复制)到 Spark/jars 文件夹中.我看不出有什么问题.但是,这不是正确的方法

Note: if I deploy(copy) the fat jar into Spark/jars folder manually. I don't see any issue. But, it is not correct approach

我正在使用窗口 7 &在同一台机器上运行主节点和工作节点.

I am using window 7 & running master and worker node on the same machine.

作业提交

spark-2.2in>spark-submit --class Welcome --master spark://169.254.208.125:7077 C:Gnanacass-conn-assembly-0.1.jar

代码

import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkConf
import com.datastax.spark.connector.streaming._


object Welcome {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("demo").setMaster("spark://169.254.208.125:7077");
    conf.set("spark.cassandra.connection.host", "192.168.1.2")
    val sc = new SparkContext(conf)

    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Milliseconds(100))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("test")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    val lines = messages.map(_.value)
    lines.map(li=>{val arr= li.split(",");(arr(0).toInt,arr(1),arr(2),arr(3))}).saveToCassandra("inventory", "emp",SomeColumns("emp_id","create_date","emp_city","emp_name"))
    println(" Spark is ready !!!!!! ");

    /*sys.ShutdownHookThread {
      println("Gracefully stopping Spark Streaming Application")
      ssc.stop(stopSparkContext = true, stopGracefully = true)
      println("Application stopped")
    }*/

    ssc.start();
    ssc.awaitTermination();
  }
  def sayHello(msg:String): Unit = {
    print("welcome to Sacala "+msg);
  }
}

build.sbt

organization := "com.demo"
name := "cass-conn"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion  % "provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
  "org.apache.spark" %% "spark-streaming" %  "2.2.0"  % "provided",
)

mergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x => (mergeStrategy in assembly).value(x)
}

错误 1:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/Consumer
        at org.apache.spark.streaming.kafka010.ConsumerStrategies$.Subscribe(ConsumerStrategy.scala:256)
        at Welcome$.main(Welcome.scala:32)
        at Welcome.main(Welcome.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.sca
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.Consumer
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 12 more

构建日志

[info] Done updating.
[info] Compiling 2 Scala sources to C:Gnanacass-conn	argetscala-2.11classes ...
[info] Done compiling.
[info] Including: slf4j-api-1.7.21.jar
[info] Including: joda-time-2.3.jar
[info] Including: kafka_2.11-0.10.1.0.jar
[info] Including: scala-library-2.11.8.jar
[info] Including: jopt-simple-4.9.jar
[info] Including: metrics-core-2.2.0.jar
[info] Including: slf4j-log4j12-1.7.21.jar
[info] Including: log4j-1.2.17.jar
[info] Including: joda-convert-1.2.jar
[info] Including: zkclient-0.9.jar
[info] Including: scala-reflect-2.11.8.jar
[info] Including: zookeeper-3.4.8.jar
[info] Including: jline-0.9.94.jar
[info] Including: netty-3.7.0.Final.jar
[info] Including: lz4-1.3.0.jar
[info] Including: scala-parser-combinators_2.11-1.0.4.jar
[info] Including: snappy-java-1.1.2.6.jar
[info] Including: kafka-clients-0.10.1.0.jar
[info] Including: spark-streaming-kafka-0-10_2.11-2.2.0.jar
[info] Including: spark-tags_2.11-2.2.0.jar
[info] Including: unused-1.0.0.jar
[info] Including: netty-all-4.0.33.Final.jar
[info] Including: commons-beanutils-1.9.3.jar
[info] Including: jsr166e-1.1.0.jar
[info] Including: spark-cassandra-connector_2.11-2.0.7.jar
[info] Including: commons-collections-3.2.2.jar
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[warn] Merging 'NOTICE' with strategy 'rename'
[warn] Merging 'META-INFNOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INFNOTICE' with strategy 'rename'
[warn] Merging 'orgxerialsnappy
ativeREADME' with strategy 'rename'
[warn] Merging 'META-INFLICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INFlicense' with strategy 'rename'
[warn] Merging 'LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INFLICENSE' with strategy 'rename'
[warn] Merging 'LICENSE' with strategy 'rename'
[warn] Merging 'META-INFDEPENDENCIES' with strategy 'discard'
[warn] Merging 'META-INFINDEX.LIST' with strategy 'discard'
[warn] Merging 'META-INFMANIFEST.MF' with strategy 'discard'
[warn] Merging 'META-INFmavencom.datastax.cassandracassandra-driver-corepom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.datastax.cassandracassandra-driver-corepom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.datastax.cassandracassandra-driver-mappingpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.datastax.cassandracassandra-driver-mappingpom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjffipom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjffipom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-constantspom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-constantspom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-ffipom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-ffipom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-posixpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-posixpom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-x86asmpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.github.jnrjnr-x86asmpom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.google.guavaguavapom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.google.guavaguavapom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.twitterjsr166epom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.twitterjsr166epom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencom.yammer.metricsmetrics-corepom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencom.yammer.metricsmetrics-corepom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencommons-beanutilscommons-beanutilspom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencommons-beanutilscommons-beanutilspom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavencommons-collectionscommons-collectionspom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavencommons-collectionscommons-collectionspom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenio.netty
etty-allpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenio.netty
etty-allpom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenio.netty
ettypom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenio.netty
ettypom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenjlinejlinepom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenjlinejlinepom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenjoda-timejoda-timepom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenjoda-timejoda-timepom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenlog4jlog4jpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenlog4jlog4jpom.xml' with strategy 'discard'
[warn] Merging 'META-INFmaven
et.sf.jopt-simplejopt-simplepom.properties' with strategy 'discard'
[warn] Merging 'META-INFmaven
et.sf.jopt-simplejopt-simplepom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.apache.sparkspark-streaming-kafka-0-10_2.11pom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.apache.sparkspark-streaming-kafka-0-10_2.11pom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.apache.sparkspark-tags_2.11pom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.apache.sparkspark-tags_2.11pom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.jodajoda-convertpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.jodajoda-convertpom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.slf4jslf4j-apipom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.slf4jslf4j-apipom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.slf4jslf4j-log4j12pom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.slf4jslf4j-log4j12pom.xml' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.spark-project.sparkunusedpom.properties' with strategy 'discard'
[warn] Merging 'META-INFmavenorg.spark-project.sparkunusedpom.xml' with strategy 'discard'
[warn] Merging 'orgapachesparkunusedUnusedStubClass.class' with strategy 'first'
[warn] Strategy 'discard' was applied to 51 files
[warn] Strategy 'first' was applied to a file
[warn] Strategy 'rename' was applied to 9 files
[info] SHA-1: 85f8513511b46290883ab70f2525b04a8d3c33d7

推荐答案

尝试将您的 spark-streaming-kafka 依赖更改为

Try changing your spark-streaming-kafka dependency to

"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0"

制作一个新鲜的肥罐,看看是否能解决问题.

build a fresh fat jar and see if this solves the problem.

最终的 build.sbt 看起来像

final build.sbt looks like

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion  % "provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" %  "2.2.0"
)

这篇关于作业提交时的Scala fat jar依赖问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆