Apache Flink - 事件时间窗口 [英] Apache Flink - Event time windows
问题描述
我想在 Apache flink 中创建键控窗口,以便每个键的窗口在键的第一个事件到达后 n 分钟执行.是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达).如果可能,请解释事件时间和水印的分配也给事件,并解释如何在 n 分钟后调用进程窗口函数.
I want to create keyed windows in Apache flink such that the windows for each key gets executed n minutes after arrival of first event for the key. Is it possible to be done using Event time characteristics ( as processing time depends on system clock and it is uncertain when will the first event arrives ). If it is possible please explain the assignment of Event time and watermark also to the events and also explain how to call the process window function after n minutes.
下面是代码的一部分,可以让您了解我目前在做什么:
Below is a part of code which can give you an idea about what i am doing currently :
//Make keyed events so as to start a window for a key
KeyedStream<SourceData, Tuple> keyedEvents =
env.addSource(new MySource(configData),"JSON Source")
.assignTimestampsAndWatermarks(new MyTimeStamps())
.setParallelism(1)
.keyBy("service");
//Start a window for windowTime time
DataStream<ResultData> resultData=
keyedEvents
.timeWindow(Time.minutes(winTime))
.process(new ProcessEventWindow(configData))
.name("Event Collection Window")
.setParallelism(25);
那么,我将如何分配事件时间和水印,使窗口遵循第一个事件的事件时间作为起点并在 10 分钟后执行(第一个事件的开始时间对于不同的键可能不同).任何帮助将不胜感激.
So, how would i assign the Event time and wateramark such that the window follow the event time of first event as starting point and executes after 10 minutes ( start time of first event can be different for different keys ). Any help would be really appreciated.
/------------ ( window of 10 minutes )
Streams |------------ ( window of 10 minutes )
\------------ ( window of 10 minutes )
我用于分配时间戳和水印的类
Edit : Class i used for assigning timestamp and watermarks
public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {
@Override
public long extractTimestamp(SourceData element, long previousElementTimestamp) {
//Will return epoch of currentTime
return GlobalUtilities.getCurrentEpoch();
}
@Override
public Watermark getCurrentWatermark() {
// TODO Auto-generated method stub
//Will return epoch of currentTime + 10 minutes
return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
}
}
推荐答案
我认为对于您的用例,最好使用 ProcessFunction.您可以做的是在第一个事件到来时注册一个 EventTimeTimer.比在 onTimer
方法中发出结果.
I think for your use case it would be best to use the ProcessFunction. What you could do is register an EventTimeTimer when the first event comes. Than in the onTimer
method emit the results.
类似于:
public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {
@Override
public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
throws Exception {
// retrieve the current aggregate
ResultData current = state.value();
if (current == null) {
// first event arrived
current = new ResultData();
// register end of window
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
}
// update the state's aggregate
current += value;
// write the state back
state.update(current);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
throws Exception {
// get the state for the key that scheduled the timer
ResultData result = state.value();
out.collect(result);
// reset the window state
state.clear();
}
}
这篇关于Apache Flink - 事件时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!