如何每隔5分钟获取最近1小时的数据而不进行分组? [英] How to get Last 1 hour data, every 5 minutes, without grouping?

查看:104
本文介绍了如何每隔5分钟获取最近1小时的数据而不进行分组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何每5分钟触发一次并获取最近1个小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录.我的理由是:

How to trigger every 5 minutes and get data for the last 1 hour? I came up with this but it does not seem to give me all the rows in the last 1 hr. My reasoning is :

  1. 读取流,

  1. Read the stream,

根据时间戳列过滤最近1小时的数据,并且

filter data for last 1 hr based on timestamp column, and

使用 forEachbatch 进行写入/打印.还有

write/print using forEachbatch. And

为它添加水印,以免保留所有过去的数据.

watermark it so that it does not hold on to all the past data.

 spark.
 readStream.format("delta").table("xxx")
   .withWatermark("ts", "60 minutes")
   .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes"))
 .writeStream
   .format("console")
   .trigger(Trigger.ProcessingTime("5 minutes"))
   .foreachBatch{ (batchDF: DataFrame, batchId: Long) =>  batchDF.collect().foreach(println)
        }
 .start()

还是我必须使用窗口?但是,如果我使用Window并且不想分组,我似乎无法摆脱 GroupBy .

Or do I have to use a Window? But I can't seem to get rid of GroupBy if I use Window and I don't want to group.

spark.
  readStream.format("delta").table("xxx")
    .withWatermark("ts", "1 hour")
    .groupBy(window($"ts", "1 hour"))
    .count()
 .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("5 minutes"))
    .foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
         print("...entering foreachBatch...\n")
         batchDF.collect().foreach(println)
         }
 .start()

推荐答案

您应该使用外部调度程序(cron等),而不是每5分钟使用火花流执行一次Spark代码.API java.util.Timer (如果您)希望在您的代码中安排处理

Instead of using spark streaming to execution your spark code every 5 minutes, you should use either an external scheduler (cron, etc...) or API java.util.Timer if you want to schedule processing in your code

如果您使用Spark Streaming来调度代码,则会遇到两个问题.

If you use spark-streaming to schedule code, you will have two issues.

第一个问题是,火花流仅处理一次数据.因此,每5分钟仅加载一次新记录.您可以考虑使用窗口函数来绕过此问题,并使用

First issue, spark-streaming processes data only once. So every 5 minutes, only the new records are loaded. You can think of bypassing this by using window function and retrieving aggregated list of rows by using collect_list, or an user defined aggregate function, but then you will meet the second issue.

第二个问题,尽管您的处理将每5分钟触发一次,但 foreachBatch 中的函数仅在有新记录要处理时才会执行.在两次执行之间的5分钟间隔内没有新记录,则什么也没有发生.

Second issue, although your treatment will be triggered every 5 minutes, function inside foreachBatch will be executed only if there are new records to process. Without new records during the 5 minutes interval between two execution, nothing happens.

总而言之,火花流并不是为了计划在特定时间间隔执行火花代码而设计的.

In conclusion, spark streaming is not designed to schedule spark code to be executed at specific time interval.

因此,您应该使用外部调度程序,例如 oozie

So instead of using spark streaming, you should use a scheduler, either external such as cron, oozie, airflow, etc... or in your code

如果需要在代码中执行此操作,则可以使用

If you need to do it in your code, you can use java.util.Timer as below:

import org.apache.spark.sql.functions.{current_timestamp, expr}
import spark.implicits._

val t = new java.util.Timer()
val task = new java.util.TimerTask {
  def run(): Unit = {
    spark.read.format("delta").table("xxx")
      .filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
      .collect()
      .foreach(println)
  }
}
t.schedule(task, 5*60*1000L, 5*60*1000L) // 5 minutes
task.run()

这篇关于如何每隔5分钟获取最近1小时的数据而不进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆