awaitTermination后如何获取流式查询的进度? [英] How to get progress of streaming query after awaitTermination?
问题描述
我是Spark的新手,正在阅读一些有关监视Spark应用程序的内容.基本上,我想知道在给定的触发时间和查询进度下,spark应用程序处理了多少条记录.我知道'lastProgress'会提供所有这些指标,但是当我将awaitTermination与'lastProgress'一起使用时,它总是返回null.
I am new to spark and was reading few things about monitoring the spark application. Basically, I want to know how many records were processed by spark application in given trigger time and progress of query. I know 'lastProgress' gives all those metrics but when I'm using awaitTermination with 'lastProgress' it always returns null.
val q4s = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.writeStream
.outputMode("append")
.option("checkpointLocation", checkpoint_loc)
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
println("Query Id: "+ q4s.id.toString())
println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();
输出:
Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null
使用awaitTermination时如何获得查询进度,或者不使用awaitTermination如何保持查询连续运行?
How can get progress of my query while using awaitTermination or how can I keep my query continuously running without using awaitTermination?
谢谢.
推荐答案
使用专用的可运行线程
您可以创建一个专用线程来连续打印流式查询的最新进度.
Using dedicated runnable thread
You can create a dedicated Thread continuously printing the last progress of your streaming query.
首先,定义一个可运行的Monitoring类,该类每10秒(10000ms)输出一次最后的Progress:
First, define a runnable Monitoring class which prints out the last Progress every 10 seconds (10000ms):
class StreamingMonitor(q: StreamingQuery) extends Runnable {
def run {
while(true) {
println("Time: " + Calendar.getInstance().getTime())
println(q.lastProgress)
Thread.sleep(10000)
}
}
}
第二,将其实现到您的应用程序代码中,如下所示:
Second, implement this into your application code as below:
val q4s: StreamingQuery = df.writeStream
[...]
.start()
new Thread(new StreamingMonitor(q4s)).start()
q4s.awaitTermination()
遍历查询状态
您还可以对查询状态进行while循环:
Looping over query status
You could also have a while loop on the status of the query:
val q4s: StreamingQuery = df.writeStream
[...]
.start()
while(q4s.isActive) {
println(q4s.lastProgress)
Thread.sleep(10000)
}
q4s.awaitTermination()
使用StreamingQueryListener的替代解决方案
监视流查询的另一种方法是使用 StreamingQueryListener
.再次,首先定义一个扩展 StreamingQueryListener
:
Alternative Solution using StreamingQueryListener
An alternative solution to monitor your streaming query would be to use the StreamingQueryListener
. Again, first define a Class extending the StreamingQueryListener
:
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
class MonitorListener extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(s"""numInputRows: ${event.progress.numInputRows}""")
println(s"""processedRowsPerSecond: ${event.progress.processedRowsPerSecond}""")
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
}
然后在您的SparkSession中注册它:
then registering it with your SparkSession:
spark.streams.addListener(new MonitorListener)
这篇关于awaitTermination后如何获取流式查询的进度?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!