使用不同的 StreamingContext 依次打开两个 KafkaStreams [英] Opening two KafkaStreams after each other with different StreamingContext

查看:17
本文介绍了使用不同的 StreamingContext 依次打开两个 KafkaStreams的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试在 Spark 流中实现一个两阶段的过程.首先,我打开一个 kafkaStream,使用 auto.offset.reset=earliest 读取主题中已有的所有内容,并在其上训练我的模型.我为此使用了一个流,因为我无法在不打开流的情况下找到如何做到这一点 (Spark - 在不打开流的情况下获取 Kafka 的最早和最新偏移量).由于我还没有发现一种在不停止整个 StreamingContext 的情况下停止流的方法,因此我在使用 ssc.stop(true, true) 进行模型计算后停止了上下文.

I am currently trying to implement a two staged process in spark streaming. First I open a kafkaStream, read everything that is already in the topic by using auto.offset.reset=earliest and train my model on it. I use a stream for that as I could not find out how to do it without opening a stream before (Spark - Get earliest and latest offset of Kafka without opening stream). As I have not discovered a way to stop the streams without stopping the whole StreamingContext I stop the context after model calculation with ssc.stop(true, true).

当我现在尝试创建一个新的 StreamingContext(使用旧的 sparkConfig 或具有相同参数的新的)时,调用我的方法以打开一个具有新 groupId 和 auto.offset.reset=latest 的新 KafkaStream 当我向 Kafka 主题写入新内容时,似乎根本没有流发生.forEachRDD 中的 print()、count() 和 println 都不会在我的 IDE 中产生任何输出.

When I now try to create a new StreamingContext (using either the old sparkConfig or a new one with the same parameters), call my method to open a new KafkaStream with new groupId and auto.offset.reset=latest it looks like there is no streaming happening at all when I write new content to the Kafka topic. Neither print() nor count() nor println in forEachRDD are resulting in any output in my IDE.

应用程序的结构如下:

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName(sparkAppName).setMaster(sparkMaster)
      .set("spark.local.dir", sparkLocalDir)
      .set("spark.driver.allowMultipleContexts", "true")

    sparkConf.registerKryoClasses(Array(classOf[Span]))
    sparkConf.registerKryoClasses(Array(classOf[Spans]))
    sparkConf.registerKryoClasses(Array(classOf[java.util.Map[String, String]]))

    val trainingSsc = new StreamingContext(sparkConf, Seconds(batchInterval))
    trainingSsc.checkpoint(checkpoint)
    //val predictor = (model, ninetynine, median, avg, max)
    val result = trainKMeans(trainingSsc);

    trainingSsc.stop(true, false)

    val predictionSsc = new StreamingContext(sparkConf, Seconds(batchInterval))
    val threshold = result._5
    val model = result._1

    kMeansAnomalyDetection(predictionSsc, model, threshold)  
}

我希望您能指出我所犯的错误 - 如果您需要更多详细信息,请告诉我.非常感谢任何帮助和提示.

I hope you can point me to the mistake I made - and if you need further details just let me know. Any help and hints are much appreciated.

推荐答案

总的来说,程序看起来是朝着正确的方向发展,但有几点需要修正:

In general, the program looks like it's going in the right direction but there are few points that need fixing:

Spark Streaming 将在 streamingContext.start() 发出时启动流调度程序.DStream 操作只会由调度程序执行.这意味着对这两个调用进行排序不会产生任何结果:

Spark Streaming will start the streaming scheduler when the streamingContext.start() is issued. DStream operations will only be executed by the scheduler. This means that sequencing these two calls will no bear any results:

val result = trainKMeans(trainingSsc);
trainingSsc.stop(true, false)

在进行任何培训之前,流上下文将停止.

The streaming context will be stopped before any training could take place.

相反,我们应该这样做:

Instead, we should do this:

val result = trainKMeans(trainingSsc)
trainingSsc.foreachRDD{_ => trainingSsc.stop(false, false) } // note that we don't stop the spark context here
trainingSsc.start()
trainingSsc.awaitTermination()

在这种情况下,我们开始流式处理;我们让第一个区间执行,在这个区间训练我们的模型,然后我们停止处理.

In this case, we start the streaming process; we let the first interval execute, in which our model will be trained, and then we stop the processing.

第二个流应该在与第一个不同的组上启动(代码片段中没有显示 kafka 流的创建)

The second stream should be started on a different group than the first one (kafka stream creation is not shown in the code snippet)

对于第二个流上下文,我们缺少一个开始:

For the second streaming context, we are missing a start:

val predictionSsc = new StreamingContext(sparkContext, Seconds(batchInterval)) // note that we pass a sparkContext here, not a config. We reuse the same spark context.
val threshold = result._5
val model = result._1
kMeansAnomalyDetection(predictionSsc, model, threshold) 
predictionSsc.start()
predictionSsc.awaitTermination()

此时我们应该有一个工作流.

We should have a working stream at this point.

这篇关于使用不同的 StreamingContext 依次打开两个 KafkaStreams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆