有没有一种方法可以动态停止Spark结构化流? [英] Is there a way to dynamically stop Spark Structured Streaming?

查看:220
本文介绍了有没有一种方法可以动态停止Spark结构化流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的场景中,我不时需要几个数据集,这些数据集需要在我们的平台中提取.摄取过程涉及几个转换步骤.其中之一就是Spark.到目前为止,我尤其使用Spark结构化的流媒体.基础结构还涉及kafka,spark结构化流从中读取数据.

In my scenario I have several dataSet that comes every now and then that i need to ingest in our platform. The ingestion processes involves several transformation steps. One of them being Spark. In particular I use spark structured streaming so far. The infrastructure also involve kafka from which spark structured streaming reads data.

我想知道是否有一种方法可以在一段时间内检测何时没有其他东西可用于该主题,以决定停止该工作.那就是我想在消耗该特定数据集然后停止它之前花时间运行它.出于特定原因,我们决定不使用spark的批处理版本.

I wonder if there is a way to detect when there is nothing else to consume from a topic for a while to decide to stop the job. That is i want to run it for the time it takes to consume that specific dataset and then stop it. For specific reasons we decided not to use the batch version of spark.

因此,是否存在任何超时或可用于检测不再有数据并且已处理完所有数据的事件.

Hence is there any timeout or something that can be used to detect that there is no more data coming it and that everything has be processed.

谢谢

推荐答案

提出了一些建议:

  1. 正如 @Michael West 指出的那样,有许多听众来跟踪进度
  2. 根据我的收集,结构化流式传输尚不支持正常关机
  1. As @Michael West pointed out, there are listeners to track progress
  2. From what I gather, Structured Streaming doesn't yet support graceful shutdown

因此,一种选择是定期检查查询活动,根据可配置状态(当您确定无法/不应取得进一步进展时)动态关闭:

So one option is to periodically check for query activity, dynamically shutting down depending on a configurable state (when you determine no further progress can/should be made):

// where you configure your spark job...
spark.streams.addListener(shutdownListener(spark))

// your job code starts here by calling "start()" on the stream...

// periodically await termination, checking for your shutdown state
while(!spark.sparkContext.isStopped) {
  if (shutdown) {
    println(s"Shutting down since first batch has completed...")
    spark.streams.active.foreach(_.stop())
    spark.stop()
  } else {
    // wait 10 seconds before checking again if work is complete
    spark.streams.awaitAnyTermination(10000)
  }
}

您的侦听器可以通过多种方式动态关闭.例如,如果您只等待一个批次,则在第一次更新后就关闭:

Your listener can dynamically shutdown in a variety of ways. For instance, if you're only waiting on a single batch, then just shutdown after the first update:

var shutdown = false
def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
  override def onQueryStarted(_: QueryStartedEvent): Unit = println("Query started: " + queryStarted.id)
  override def onQueryTerminated(_: QueryTerminatedEvent): Unit = println("Query terminated! " + queryTerminated.id)
  override def onQueryProgress(_: QueryProgressEvent): Unit = shutdown = true
}

或者,如果需要在更复杂的状态更改后关闭,则可以解析queryProgress.progress的json主体,以确定是否在给定的onQueryUpdate事件触发时关闭.

Or, if you need to shutdown after more complicated state changes, you could parse the json body of the queryProgress.progress to determine whether or not to shutdown at a given onQueryUpdate event firing.

这篇关于有没有一种方法可以动态停止Spark结构化流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆