如何每隔5分钟获取最近1小时的数据而不进行分组? [英] How to get Last 1 hour data, every 5 minutes, without grouping?
问题描述
如何每5分钟触发一次并获取最近1个小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录.我的理由是:
How to trigger every 5 minutes and get data for the last 1 hour? I came up with this but it does not seem to give me all the rows in the last 1 hr. My reasoning is :
-
读取流,
Read the stream,
根据时间戳列过滤最近1小时的数据,并且
filter data for last 1 hr based on timestamp column, and
使用 forEachbatch
进行写入/打印.还有
write/print using forEachbatch
. And
为它添加水印,以免保留所有过去的数据.
watermark it so that it does not hold on to all the past data.
spark.
readStream.format("delta").table("xxx")
.withWatermark("ts", "60 minutes")
.filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes"))
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes"))
.foreachBatch{ (batchDF: DataFrame, batchId: Long) => batchDF.collect().foreach(println)
}
.start()
还是我必须使用窗口?但是,如果我使用Window并且不想分组,我似乎无法摆脱 GroupBy
.
Or do I have to use a Window? But I can't seem to get rid of GroupBy
if I use Window and I don't want to group.
spark.
readStream.format("delta").table("xxx")
.withWatermark("ts", "1 hour")
.groupBy(window($"ts", "1 hour"))
.count()
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes"))
.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
print("...entering foreachBatch...\n")
batchDF.collect().foreach(println)
}
.start()
推荐答案
您应该使用外部调度程序(cron等),而不是每5分钟使用火花流执行一次Spark代码.API java.util.Timer (如果您)希望在您的代码中安排处理
Instead of using spark streaming to execution your spark code every 5 minutes, you should use either an external scheduler (cron, etc...) or API java.util.Timer if you want to schedule processing in your code
如果您使用Spark Streaming来调度代码,则会遇到两个问题.
If you use spark-streaming to schedule code, you will have two issues.
第一个问题是,火花流仅处理一次数据.因此,每5分钟仅加载一次新记录.您可以考虑使用窗口函数来绕过此问题,并使用用户定义的聚合函数,但是您将遇到第二个问题.
First issue, spark-streaming processes data only once. So every 5 minutes, only the new records are loaded. You can think of bypassing this by using window function and retrieving aggregated list of rows by using collect_list, or an user defined aggregate function, but then you will meet the second issue.
第二个问题,尽管您的处理将每5分钟触发一次,但 foreachBatch
中的函数仅在有新记录要处理时才会执行.在两次执行之间的5分钟间隔内没有新记录,则什么也没有发生.
Second issue, although your treatment will be triggered every 5 minutes, function inside foreachBatch
will be executed only if there are new records to process. Without new records during the 5 minutes interval between two execution, nothing happens.
总而言之,火花流并不是为了计划在特定时间间隔执行火花代码而设计的.
In conclusion, spark streaming is not designed to schedule spark code to be executed at specific time interval.
因此,您应该使用外部调度程序,例如 oozie ,
So instead of using spark streaming, you should use a scheduler, either external such as cron, oozie, airflow, etc... or in your code
If you need to do it in your code, you can use java.util.Timer as below:
import org.apache.spark.sql.functions.{current_timestamp, expr}
import spark.implicits._
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run(): Unit = {
spark.read.format("delta").table("xxx")
.filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
.collect()
.foreach(println)
}
}
t.schedule(task, 5*60*1000L, 5*60*1000L) // 5 minutes
task.run()
这篇关于如何每隔5分钟获取最近1小时的数据而不进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!