处理时间窗口不适用于Apache Flink中的有限数据源 [英] Processing time windows doesn't work on finite data sources in Apache Flink
问题描述
我正在尝试将一个非常简单的窗口函数应用于Apache Flink中的有限数据流(本地,没有集群).这是示例:
I'm trying to apply a very simple window function to a finite data stream in Apache Flink (locally, no cluster). Here's the example:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
在这里,我尝试在一秒钟内将到达窗口的所有元素分组,然后仅打印这些组.
Here, I try to group all the elements that arrive into the window within a second and then just print these groups.
我假设所有元素将在不到一秒钟的时间内生成并进入一个窗口,因此在print()
中将有一个传入元素.但是,运行此命令时,什么都不会打印.
I assumed that all the elements would be produces in much less than one second and get into one window, so there will be one incoming element in print()
. However, nothing is printed at all when I run this.
如果我删除所有加窗的东西,例如
If I remove all the windowing stuff, like
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
运行后我看到了打印的元素.我也尝试过使用文件源进行此操作,没什么区别.
I see the elements printed after the run. I also tried this with the file source, no difference.
我的计算机上的默认并行度为6.如果我尝试并行度和延迟级别,就像这样
The default parallelism on my machine is 6. If I experiment with the level of parallelism and delays, like this
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
我能够将一些(而不是全部)元素分组,然后打印出来.
I able to get some--not all--elements into groups, which are printed.
我的第一个假设是,源完成的速度比1秒要快得多,并且在窗口的计时器触发之前,任务已关闭.调试显示代码)?
My first assumption is that the source finishes much faster than 1 second and the task is shut down before the window's timer fires. Debugging showed that the timer setting line in ProcessingTimeTrigger
is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?
能否请您帮助我理解这一点并使其更具确定性?
Could you please help me understand this and make this more deterministic?
更新#1,23.09.2018:
我还尝试了事件时间窗口,而不是处理时间窗口.如果我这样做:
I also experimented with event time windows instead of processing time windows. If I do this:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
override def extractAscendingTimestamp(element: String): Long = {
element.charAt(0).toInt
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.trigger(EventTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.toString())
}
})
.print()
env.execute()
然后再次不打印任何内容.调试器显示对每个元素调用触发器的onElement
,但从不调用onEventTime
.
Then again nothing is printed. The debugger shows that the trigger's onElement
is called for every element, but onEventTime
is never called.
此外,如果我修改时间戳提取器以进行更大的操作:
Also, if I modify the the timestamp extractor to make bigger steps:
element.charAt(0).toInt * 1000
除最后一个元素外,所有元素都被打印出来(每组一个元素,这是预期的).
all the elements are printed (one element per group, which is expected), apart from the last.
更新#2,23.09.2018: