awaitTermination后如何获取流式查询的进度? [英] How to get progress of streaming query after awaitTermination?

查看:32
本文介绍了awaitTermination后如何获取流式查询的进度?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spark的新手,正在阅读一些有关监视Spark应用程序的内容.基本上,我想知道在给定的触发时间和查询进度下,spark应用程序处理了多少条记录.我知道'lastProgress'会提供所有这些指标,但是当我将awaitTermination与'lastProgress'一起使用时,它总是返回null.

I am new to spark and was reading few things about monitoring the spark application. Basically, I want to know how many records were processed by spark application in given trigger time and progress of query. I know 'lastProgress' gives all those metrics but when I'm using awaitTermination with 'lastProgress' it always returns null.

 val q4s = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .load()
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", checkpoint_loc)
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .start()

  println("Query Id: "+ q4s.id.toString())
  println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();

输出:

Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null

使用awaitTermination时如何获得查询进度,或者不使用awaitTermination如何保持查询连续运行?

How can get progress of my query while using awaitTermination or how can I keep my query continuously running without using awaitTermination?

谢谢.

推荐答案

使用专用的可运行线程

您可以创建一个专用线程来连续打印流式查询的最新进度.

Using dedicated runnable thread

You can create a dedicated Thread continuously printing the last progress of your streaming query.

首先,定义一个可运行的Monitoring类,该类每10秒(10000ms)输出一次最后的Progress:

First, define a runnable Monitoring class which prints out the last Progress every 10 seconds (10000ms):

class StreamingMonitor(q: StreamingQuery) extends Runnable {
  def run {
    while(true) {
      println("Time: " + Calendar.getInstance().getTime())
      println(q.lastProgress)
      Thread.sleep(10000)
    }
  }
}

第二,将其实现到您的应用程序代码中,如下所示:

Second, implement this into your application code as below:

val q4s: StreamingQuery = df.writeStream
  [...]
  .start()

new Thread(new StreamingMonitor(q4s)).start()

q4s.awaitTermination()

遍历查询状态

您还可以对查询状态进行while循环:

Looping over query status

You could also have a while loop on the status of the query:

val q4s: StreamingQuery = df.writeStream
  [...]
  .start()

while(q4s.isActive) {
  println(q4s.lastProgress)
  Thread.sleep(10000)
}

q4s.awaitTermination()

使用StreamingQueryListener的替代解决方案

监视流查询的另一种方法是使用 StreamingQueryListener .再次,首先定义一个扩展 StreamingQueryListener :

Alternative Solution using StreamingQueryListener

An alternative solution to monitor your streaming query would be to use the StreamingQueryListener. Again, first define a Class extending the StreamingQueryListener:

import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent


class MonitorListener extends StreamingQueryListener {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(s"""numInputRows: ${event.progress.numInputRows}""")
    println(s"""processedRowsPerSecond: ${event.progress.processedRowsPerSecond}""")
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
}

然后在您的SparkSession中注册它:

then registering it with your SparkSession:

spark.streams.addListener(new MonitorListener)

这篇关于awaitTermination后如何获取流式查询的进度?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆