Flink 自定义触发器给出意外输出 [英] Flink Custom Trigger giving Unexpected Output

查看:25
本文介绍了Flink 自定义触发器给出意外输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个 Trigger,它第一次在 20 秒内触发,之后每五秒触发一次.我使用了 GlobalWindows 和一个自定义的 Trigger

I want to create a Trigger which gets fired in 20 seconds for the first time and in every five seconds after that. I have used GlobalWindows and a custom Trigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())

这是TradeTrigger中的代码:

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        } else {
            if((System.currentTimeMillis()-ctime) >= 5000){
                ctime = System.currentTimeMillis();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

    }

    @Override
    public TriggerResult onEventTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }


    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

}

所以基本上,当 flagfalse 时,即第一次,Trigger 应该在 20 秒内触发并设置 flagtrue.从下一次开始,它应该每 5 秒触发一次.

So basically, when flag is false, i.e. the first time, the Trigger should get fired in 20 seconds and set the flag to true. From the next time, it should get fired every 5 seconds.

我面临的问题是,每次触发 Trigger 时,我只在输出中收到一条消息.也就是说,我在 20 秒后收到一条消息,每五秒收到一条消息.我期望每次触发时输出中有 20 条消息.

The problem I am facing is, I am getting only one message in the output every time the Trigger is fired. That is, I get a single message after 20 seconds and single messages in every five seconds. I am expecting twenty messages in the output on each triggering.

如果我使用 .timeWindow(Time.seconds(5)) 并创建一个 5 秒的时间窗口,我会每 5 秒输出 20 条消息.请帮助我正确获取此代码.有什么我遗漏的吗?

If I use .timeWindow(Time.seconds(5)) and create a time window of five seconds, I get 20 messages in output every 5 seconds. Please help me get this code right. Is there something I am missing?

推荐答案

在来自 Fabian 和 Flink 邮件列表的答案的帮助下得到了解决.通过 TriggerContext 将状态存储在 ValueState 变量中.检查onEvent()方法中的变量,如果是第一次,注册一个processingTimeTimer比当前时间多20秒并更新状态.在 onProcessingTime 方法中,注册另一个 ProcessingTimeTimer 比当前时间多 5 秒,更新状态并触发 Window.

Got it working with the help of the answer from Fabian and Flink mailing lists. Stored the state in a ValueState variable through the TriggerContext. Checked the variable in onEvent() method and if it was the first time, registered a processingTimeTimer for 20 seconds more than the current time and updated the state. In the onProcessingTime method, registered another ProcessingTimeTimer for 5 seconds more than current time, updated the state and fired the Window.

这篇关于Flink 自定义触发器给出意外输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆