Spark流媒体+ json4s-jackson依赖问题 [英] Spark streaming + json4s-jackson dependency problems

查看:2013
本文介绍了Spark流媒体+ json4s-jackson依赖问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法在我的spark 1.4.1 Streaming应用程序中使用json4s-Jackson 3.2.11。

I am unable to use json4s-Jackson 3.2.11 within my spark 1.4.1 Streaming application.

认为这是导致问题的spark-core项目中现有的依赖关系 - > 是否有可能将json4s 3.2.11与Spark 1.3一起使用.0?我使用调整后的core / pom.xml从源代码构建了Spark。我已经将json4s-jackson_2.10:3.2.10更改为3.2.11,因为2.10版本不支持提取到隐式类型。

Thinking that it was the existing dependency within the spark-core project that is causing the problem as explained here -> Is it possible to use json4s 3.2.11 with Spark 1.3.0? I have built Spark from source with an adjusted core/pom.xml. I have changed the reference from json4s-jackson_2.10:3.2.10 to 3.2.11, as the 2.10 version does not support extracting to implicit types.

我已经替换在我的intellij IDEA项目中使用重建的jar引用的源jar,但是我仍然遇到与以前相同的错误。我担心Spark仍然必须以某种方式引用json4s 3.2.10?

I have replaced the source jars that are referenced in my intellij IDEA project with the rebuilt jars, however I am still getting the same errors as before. I fear that Spark must still be referencing json4s 3.2.10 somehow?

这是我的简单测试:

object StreamingPredictor {

  implicit val formats = DefaultFormats

  case class event(Key: String,
                   sensorId: String,
                   sessionId: String,
                   deviceId: String,
                   playerId: String,
                   impressionId: String,
                   time: String,
                   eventName: String,
                   eventProperties: Map[String, Any],
                   dl: Array[List[(String, Any)]],
                   $post: Boolean,
                   $sync: Boolean)

  def parser(json: String): String = {
    val parsedJson = parse(json)
    val foo = parsedJson.extract[event]
    foo.eventName
  }

  def main(args: Array[String]) {

    val zkQuorum = "localhost:2181"
    val group = "myGroup"
    val topic = Map("test" -> 1)
    val sparkContext = new SparkContext("local[4]","KafkaConsumer")
    val ssc = new StreamingContext(sparkContext, Seconds(1))

    val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
    val eventName = json.map(_._2).map(parser)

    eventName.print()

    ssc.start()

  }
}

在我的应用程序pom.xml文件中引用json4s 3.2.11时出现的错误:

The error I get when referencing json4s 3.2.11 in my application pom.xml file:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.render(Lorg/json4s/JsonAST$JValue;)Lorg/json4s/JsonAST$JValue;
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
        at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
        at org.apache.spark.scheduler.EventLoggingListener.onBlockManagerAdded(EventLoggingListener.scala:174)
        at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:46)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
        at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

我在我的应用程序pom.xml文件中使用json4s-jackson_2.10:3.2.10时出现的错误:

And the error I get when i use json4s-jackson_2.10:3.2.10 in my application pom.xml file:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.json4s.package$MappingException: No usable value for eventProperties
No information known about type
        at org.json4s.reflect.package$.fail(package.scala:96)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:443)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:463)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:463)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:451)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488)
        at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500)
        at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488)
        at org.json4s.Extraction$.extract(Extraction.scala:332)
        at org.json4s.Extraction$.extract(Extraction.scala:42)
        at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
        at com.pca.triggar.Streaming.StreamingPredictor$.parser(StreamingPredictor.scala:38)
        at com.pca.triggar.Streaming.StreamingPredictor$$anonfun$2.apply(StreamingPredictor.scala:57)
        at com.pca.triggar.Streaming.StreamingPredictor$$anonfun$2.apply(StreamingPredictor.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1276)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1276)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.json4s.package$MappingException: No information known about type
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:465)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:491)
        at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:488)
        at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:500)
        at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:488)
        at org.json4s.Extraction$.extract(Extraction.scala:332)
        at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316)
        at org.json4s.Extraction$$anonfun$extract$5.apply(Extraction.scala:316)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.json4s.Extraction$.extract(Extraction.scala:316)
        at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:431)
        ... 42 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


推荐答案

好的,我发现了问题。正如在其他地方发布的那样,你需要针对jason4s 3.2.10进行编译。显然,这样做会生成一个二进制文件,然后使用Spark(在我的情况下为1.5版本。在某些早期版本中也是如此)。它与render()方法中的默认参数有关,该参数显示在3.2.11中。

Ok, I found the problem. As posted else where, you need to compile against jason4s 3.2.10. Apparently, doing so generates a binary which would then work with Spark (version 1.5 in my case. Same in some earlier versions as well). It has to do with the default parameter in the render() method which shows up in 3.2.11.

这篇关于Spark流媒体+ json4s-jackson依赖问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆