启动Spark流上下文时出错 [英] Error in starting Spark streaming context

查看:76
本文介绍了启动Spark流上下文时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spark Streaming的新手,正在为twitter连接器编写代码.当我多次运行此代码时,它给出了以下异常.我必须为每次检查点创建一个新的hdfs目录,以使其成功运行,而且它不会停止运行.

I am new to Spark Streaming and writing a code for twitter connector. when i run this code more than one time, it gives the following exception. I have to create a new hdfs directory for checkpointing each time to make it run successfully and moreover it doesn't get stopped.

 ERROR StreamingContext: Error starting the context, marking it as stopped
    org.apache.spark.SparkException: org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been initialized
    at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
    at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
    at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
    at twitter.streamingSpark$.twitterConnector(App.scala:38)
    at twitter.streamingSpark$.main(App.scala:26)
    at twitter.streamingSpark.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

相关代码是

 def twitterConnector() :Unit =
 {
     val atwitter=managingCredentials()

   val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> {   managingContext() })
   fetchTweets(ssc, atwitter )

   ssc.start()             // Start the computation
   ssc.awaitTermination()

   }

   def managingContext():StreamingContext =
  {
   //making spark context
   val conf = new SparkConf().setMaster("local[*]").setAppName("twitterConnector")
   val ssc = new StreamingContext(conf, Seconds(1))
   val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
   import sqlContext.implicits._

   //checkpointing  
   ssc.checkpoint("hdfsDirectory")
   ssc
   }
    def fetchTweets (ssc : StreamingContext , atwitter : Option[twitter4j.auth.Authorization]) : Unit = {


   val tweets =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
   val twt = tweets.window(Seconds(10),Seconds(10))
  //checkpoint duration
  /twt.checkpoint(new Duration(1000))

   //processing
   case class Tweet(createdAt:Long, text:String)
   twt.map(status=>
   Tweet(status.getCreatedAt().getTime()/1000, status.getText())
   )
   twt.print()
  }

有人可以在这方面帮助我吗?

Can anyone help me in this regards?

推荐答案

您面临的问题应与以下事实有关:您必须将转换和操作的管理置于 DStream 上应该在 managingContext 函数中执行.

The issue you're facing should be related to the fact that you have to put the management of your tranformations and action on DStreams should be performed in the managingContext function.

因此,在 managingContext 内部移动 fetchTweets(ssc,atwitter)应该可以解决此问题.

Thus, moving fetchTweets(ssc, atwitter ) inside managingContext should solve the issue.

这篇关于启动Spark流上下文时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆