Flink SQL:投射时间中跳跃窗口上呈指数衰减的移动平均值 [英] An exponentially decaying moving average over a hopping window in Flink SQL: Casting time

查看:381
本文介绍了Flink SQL:投射时间中跳跃窗口上呈指数衰减的移动平均值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在我们在Flink中拥有带花式窗口的SQL,我正在尝试使用在Table API和SQL的未来Flink版本中可能实现的功能"所引用的递减移动平均线.从他们的 SQL路线图/预览2017-03帖子中:

Now we have SQL with fancy windowing in Flink, I'm trying to have the decaying moving average referred by "what will be possible in future Flink releases for both the Table API and SQL." from their SQL roadmap/preview 2017-03 post:

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

这是我的尝试(也受到方解石腐烂示例的启发):

Here is my attempt (inspired as well by the calcite decaying example):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

时间是处理时间,它是通过从AppendStream表中创建write_position时获得的过程时间:

Time is processing time, which we get as proctime with the creation of the write_position from an AppendStream Table as:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

我收到此错误:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

我尝试将proctime转换为我知道的所有其他类型(以尝试达到NUMERIC承诺的土地),但我只是找不到如何使它起作用.

I've tried casting proctime to every other type I know of (in an attempt to reach the NUMERIC promised land), and I just can't find how to make it work.

我错过了什么吗? proctime是您无法转换的一种非常特殊的系统更改号"时间吗?如果是这样,仍然必须有某种方法可以将其与HOP_START(proctime,...)值进行比较.

Am I missing something? Is proctime some very special kind of 'system change number' time that you can't convert? If so, there still must be some way to compare it to the HOP_START(proctime,...) value.

推荐答案

您可以使用timestampDiff减去两个时间点(请参见

You can use timestampDiff to subtract two timepoints (see the docs). You use it like this

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

时间点单位可以是秒,分钟,小时,天,月或年.

where timepointunit can be SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

我还没有尝试使用处理时间,但是它确实适用于事件时间字段,所以希望可以.

I haven't tried this with processing time, but it does work with event time fields, so hopefully it will.

这篇关于Flink SQL:投射时间中跳跃窗口上呈指数衰减的移动平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆