Apache Flink - 事件时间窗口 [英] Apache Flink - Event time windows

查看:27
本文介绍了Apache Flink - 事件时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 Apache flink 中创建键控窗口,以便每个键的窗口在键的第一个事件到达后 n 分钟执行.是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达).如果可能,请解释事件时间和水印的分配也给事件,并解释如何在 n 分钟后调用进程窗口函数.

I want to create keyed windows in Apache flink such that the windows for each key gets executed n minutes after arrival of first event for the key. Is it possible to be done using Event time characteristics ( as processing time depends on system clock and it is uncertain when will the first event arrives ). If it is possible please explain the assignment of Event time and watermark also to the events and also explain how to call the process window function after n minutes.

下面是代码的一部分,可以让您了解我目前在做什么:

Below is a part of code which can give you an idea about what i am doing currently :

            //Make keyed events so as to start a window for a key
            KeyedStream<SourceData, Tuple> keyedEvents = 
                    env.addSource(new MySource(configData),"JSON Source")
                    .assignTimestampsAndWatermarks(new MyTimeStamps())
                    .setParallelism(1)
                    .keyBy("service");


            //Start a window for windowTime time
            DataStream<ResultData> resultData=
                    keyedEvents
                    .timeWindow(Time.minutes(winTime))
                    .process(new ProcessEventWindow(configData))
                    .name("Event Collection Window")
                    .setParallelism(25);

那么,我将如何分配事件时间和水印,使窗口遵循第一个事件的事件时间作为起点并在 10 分钟后执行(第一个事件的开始时间对于不同的键可能不同).任何帮助将不胜感激.

So, how would i assign the Event time and wateramark such that the window follow the event time of first event as starting point and executes after 10 minutes ( start time of first event can be different for different keys ). Any help would be really appreciated.

        /------------ ( window of 10 minutes )
Streams          |------------ ( window of 10 minutes )
            \------------ ( window of 10 minutes )

我用于分配时间戳和水印的类

Edit : Class i used for assigning timestamp and watermarks

public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {

    @Override
    public long extractTimestamp(SourceData element, long previousElementTimestamp) {

          //Will return epoch of currentTime
        return GlobalUtilities.getCurrentEpoch();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // TODO Auto-generated method stub
        //Will return epoch of currentTime + 10 minutes
        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
    }

}

推荐答案

我认为对于您的用例,最好使用 ProcessFunction.您可以做的是在第一个事件到来时注册一个 EventTimeTimer.比在 onTimer 方法中发出结果.

I think for your use case it would be best to use the ProcessFunction. What you could do is register an EventTimeTimer when the first event comes. Than in the onTimer method emit the results.

类似于:

public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {

    @Override
    public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
        throws Exception {

        // retrieve the current aggregate
        ResultData current = state.value();
        if (current == null) {
            // first event arrived
            current = new ResultData();
            // register end of window
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
        }

        // update the state's aggregate
        current += value;

        // write the state back
        state.update(current);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
        throws Exception {

        // get the state for the key that scheduled the timer
        ResultData result = state.value();

        out.collect(result);

        // reset the window state
        state.clear();
    }
}

这篇关于Apache Flink - 事件时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆