Spark Streaming不记得以前的状态 [英] Spark streaming not remembering previous state

查看:68
本文介绍了Spark Streaming不记得以前的状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了带有状态转换的Spark Streaming程序. 似乎我的Spark Streaming应用程序正在使用检查点正确执行计算. 但是,如果我终止程序并再次启动它,则它不是在读取先前的检查点数据,而是从头开始.这是预期的行为吗?

I wrote spark streaming program with stateful transformation. It seems like my spark streaming application is doing computation correctly with check pointing. But if i terminate my program and if i start it again, it's not reading the previous checkpointing data and staring from the beginning. Is it the expected behaviour?

我是否需要更改程序中的任何内容,以便它可以记住先前的数据并从那里开始计算?

Do i need to change anything in my program so that it will remember the previous data and start computation from there?

谢谢.

我的程序供参考:

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val inputStream = ssc.socketTextStream(<hostname>, 9999)
    ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir")
    inputStream.print(1)
    val parsedStream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
      })
    import breeze.linalg.{DenseVector => BDV}
    import scala.util.Try

    val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
      (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
        prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })
    state.checkpoint(Duration(10000))
    state.foreachRDD(rdd => rdd.foreach(Blaher.blah))

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }
}

推荐答案

根据流媒体文档,您应该对上下文进行一些不同的初始化:

According to spark-streaming documentation you should initialize context a bit differently:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

请参见检查点

这篇关于Spark Streaming不记得以前的状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆