Spark SQL窗口的时间间隔介于两个指定的时间范围之间-3个小时至2个小时之间 [英] Spark SQL Window over interval of between two specified time boundaries - between 3 hours and 2 hours ago
问题描述
使用两个预定义的边界在Spark SQL中指定窗口间隔的正确方法是什么?
What is the proper way of specifying window interval in Spark SQL, using two predefined boundaries?
我正在尝试在"3小时前和2小时前"之间的窗口中汇总表中的值.
I am trying to sum up values from my table over a window of "between 3 hours ago and 2 hours ago".
当我运行此查询时:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;
那行得通.我得到了我期望的结果,即落入2小时滚动窗口的值之和.
That works. I get results that I expect, i.e. sums of values that fall into 2 hours rolling window.
现在,我需要的是不将滚动窗口绑定到当前行,而是要考虑3小时前和2小时前之间的行. 我尝试过:
Now, what I need is to have that rolling window not being bound to the current row but to take into account rows between 3 hours ago and 2 hours ago. I tried with:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;
但是我收到extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'}
错误.
我也尝试过:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;
但随后出现不同的错误scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)
but then I get different error scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)
我尝试过的第三种选择是:
Third option I tried is:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;
,它无法正常运行:cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch
我很难找到间隔类型的文档,例如此链接讲得还不够,其他信息还真是半熟.至少我发现了.
I am having difficulties finding the docs for interval type as this link doesn't say enough and other information is kinda half baked. At least what I found.
推荐答案
由于范围间隔不起作用,因此我不得不转向另一种方法. 它是这样的:
Since range intervals didn't work their thing, I had to turn to an alternative approach. It goes something like this:
- 准备需要执行计算的间隔列表
- 对于每个间隔,运行计算
- 这些迭代中的每一个都会产生一个数据帧
- prepare a list of intervals for which computation needs to be performed
- for each of the intervals, run the computation
- each of those iterations produces a data frame
在我的情况下,我不得不每天进行每小时的计算,并将每小时"的结果(即24个数据帧的列表)合并为一个每日"数据帧.
In my case, I had to run computations for each hour of the day and combine those "hourly" results, i.e. a list of 24 data frames, into one, "daily", data frame.
从非常高级的角度来看,代码如下:
Code, from very high level perspective, looks like this:
val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield { val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart)) // do stuff // return a data frame } hourlyDFs.toSeq().reduce(_.union(_))
这篇关于Spark SQL窗口的时间间隔介于两个指定的时间范围之间-3个小时至2个小时之间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!