用不同的StreamingContext互相打开两个KafkaStreams [英] Opening two KafkaStreams after each other with different StreamingContext

查看:56
本文介绍了用不同的StreamingContext互相打开两个KafkaStreams的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试在Spark Streaming中实施一个两阶段的过程.首先,我打开一个kafkaStream,使用 auto.offset.reset = earliest 阅读该主题中已有的所有内容,并在其上训练我的模型.我为此使用了流,因为在没有打开流之前我无法找到如何做的方法(

希望您能指出我的错误-如果您需要更多详细信息,请告诉我.任何帮助和提示都将不胜感激.

一般来说,该程序看起来朝着正确的方向发展,但是有一些问题需要解决:

发出 streamingContext.start()时,

Spark Streaming将启动流调度程序.DStream操作将仅由调度程序执行.这意味着对这两个调用进行排序不会产生任何结果:

  val结果= trainKMeans(trainingSsc);trainingSsc.stop(是,否) 

在进行任何培训之前,流上下文将被停止.

相反,我们应该这样做:

  val结果= trainKMeans(trainingSsc)trainingSsc.foreachRDD {_ =>trainingSsc.stop(false,false)}//请注意,我们不会在此处停止spark上下文trainingSsc.start()trainingSsc.awaitTermination() 

在这种情况下,我们开始流传输过程;我们让第一个间隔执行,在该间隔内训练我们的模型,然后停止处理.

第二个流应该从与第一个不同的组开始(代码片段中未显示kafka流的创建)

对于第二个流媒体上下文,我们缺少一个开始:

  val projectionSsc = new StreamingContext(sparkContext,Seconds(batchInterval))//请注意,我们在此处传递的是sparkContext,而不是配置.我们重用了相同的火花上下文.val阈值=结果._5val模型= result._1kMeansAnomalyDetection(predictionSsc,模型,阈值)projectionSsc.start()projectionSsc.awaitTermination() 

这时我们应该有一个工作流程.

I am currently trying to implement a two staged process in spark streaming. First I open a kafkaStream, read everything that is already in the topic by using auto.offset.reset=earliest and train my model on it. I use a stream for that as I could not find out how to do it without opening a stream before (Spark - Get earliest and latest offset of Kafka without opening stream). As I have not discovered a way to stop the streams without stopping the whole StreamingContext I stop the context after model calculation with ssc.stop(true, true).

When I now try to create a new StreamingContext (using either the old sparkConfig or a new one with the same parameters), call my method to open a new KafkaStream with new groupId and auto.offset.reset=latest it looks like there is no streaming happening at all when I write new content to the Kafka topic. Neither print() nor count() nor println in forEachRDD are resulting in any output in my IDE.

The structure of the application looks like:

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName(sparkAppName).setMaster(sparkMaster)
      .set("spark.local.dir", sparkLocalDir)
      .set("spark.driver.allowMultipleContexts", "true")

    sparkConf.registerKryoClasses(Array(classOf[Span]))
    sparkConf.registerKryoClasses(Array(classOf[Spans]))
    sparkConf.registerKryoClasses(Array(classOf[java.util.Map[String, String]]))

    val trainingSsc = new StreamingContext(sparkConf, Seconds(batchInterval))
    trainingSsc.checkpoint(checkpoint)
    //val predictor = (model, ninetynine, median, avg, max)
    val result = trainKMeans(trainingSsc);

    trainingSsc.stop(true, false)

    val predictionSsc = new StreamingContext(sparkConf, Seconds(batchInterval))
    val threshold = result._5
    val model = result._1

    kMeansAnomalyDetection(predictionSsc, model, threshold)  
}

I hope you can point me to the mistake I made - and if you need further details just let me know. Any help and hints are much appreciated.

解决方案

In general, the program looks like it's going in the right direction but there are few points that need fixing:

Spark Streaming will start the streaming scheduler when the streamingContext.start() is issued. DStream operations will only be executed by the scheduler. This means that sequencing these two calls will no bear any results:

val result = trainKMeans(trainingSsc);
trainingSsc.stop(true, false)

The streaming context will be stopped before any training could take place.

Instead, we should do this:

val result = trainKMeans(trainingSsc)
trainingSsc.foreachRDD{_ => trainingSsc.stop(false, false) } // note that we don't stop the spark context here
trainingSsc.start()
trainingSsc.awaitTermination()

In this case, we start the streaming process; we let the first interval execute, in which our model will be trained, and then we stop the processing.

The second stream should be started on a different group than the first one (kafka stream creation is not shown in the code snippet)

For the second streaming context, we are missing a start:

val predictionSsc = new StreamingContext(sparkContext, Seconds(batchInterval)) // note that we pass a sparkContext here, not a config. We reuse the same spark context.
val threshold = result._5
val model = result._1
kMeansAnomalyDetection(predictionSsc, model, threshold) 
predictionSsc.start()
predictionSsc.awaitTermination()

We should have a working stream at this point.

这篇关于用不同的StreamingContext互相打开两个KafkaStreams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆