在 Flink 中的聚合原语中具有等效于 HOP_START 的内容 [英] Having an equivalent to HOP_START inside an aggregation primitive in Flink

查看:33
本文介绍了在 Flink 中的聚合原语中具有等效于 HOP_START 的内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在 Flink SQL 的跳跃窗口上做一个指数衰减的移动平均线.我需要有权访问窗口的边框之一,即以下 HOP_START:

I'm trying to do an exponentially decaying moving average over a hopping window in Flink SQL. I need the have access to one of the borders of the window, the HOP_START in the following:

    SELECT                                                                              
      lb_index one_key,
    -- I have access to this one:
      HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
    -- Aggregation primitive:
      SUM(
        Y * EXP(TIMESTAMPDIFF(
          SECOND, 
          proctime, 
    -- This one throws:
          HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      )))
    FROM write_position                                                                
    GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

我得到以下堆栈跟踪:

11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Aggregate(groupBy: (lb_index), window: (SlidingGroupWindow('w$, 'proctime, 5000.millis, 50.millis)), select: (lb_index, SUM($f2) AS Y, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Calc(select: (lb_index, proctime, *(payload.Y, EXP(/(CAST(/INT(Reinterpret(-(HOP_START(PROCTIME(proctime), 50, 5000), PROCTIME(proctime))), 1000)), 1000))) AS $f2))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using rel#459:DataStreamScan.DATASTREAM.true.Acc(table=[_DataStreamTable_0])
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Unsupported call: HOP_START 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)

它确实说它在聚合 SUM 之外工作时未实现.所以这就是让我认为这是一个范围界定问题的原因.

It does say is it unimplemented while it works outside the aggregating SUM. So that's what makes me think this is a scoping issue.

现在,问题是:我可以转换这个表达式并在聚合之外做最后的处理,如 exp(x+y) = exp(x)*exp(y);但我坚持使用 TIMESTAMPDIFF (这在我上一期中创造了奇迹).我还没有找到将 TIME ATTRIBUTE 转换为 NUMERIC 类型的方法;此外,即使我缩小了 UNIX 时间戳,我也不喜欢对它们取幂.

Now, the thing is: I could transform this expression and do a final processing outside the aggregation, as exp(x+y) = exp(x)*exp(y); But I'm stuck with using TIMESTAMPDIFF (which did wonders in my previous issue). I have not found a way to cast TIME ATTRIBUTEs to NUMERIC types; also, I'm not comfortable exponentiating UNIX timestamps, even if I scale them down.

无论如何,这种解决方法有点笨拙,可能还有另一种方法.我不知道如何在这个 SQL 片段中按摩作用域,使其仍然处于"窗口作用域中,并且有开始时间而不会抛出.

Anyway, this work-around would be sort of clunky and there might me another way. I don't know how I could massage scopes in this SQL piece to still 'be' in the window scope and have the start time without throwing.

推荐答案

我建议您尝试使用 HOP_PROCTIME() 而不是 HOP_START().差异解释 此处,但效果是您将拥有 proctime 属性而不是时​​间戳,我希望这会让 TIMESTAMPDIFF 满意.

I suggest you experiment with HOP_PROCTIME() rather than HOP_START(). The differences are explained here, but the effect will be that you'll have a proctime attribute rather than a timestamp, which I'm hoping will make TIMESTAMPDIFF happy.

这篇关于在 Flink 中的聚合原语中具有等效于 HOP_START 的内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆