处理时间窗口不适用于Apache Flink中的有限数据源 [英] Processing time windows doesn't work on finite data sources in Apache Flink

查看:200
本文介绍了处理时间窗口不适用于Apache Flink中的有限数据源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将一个非常简单的窗口函数应用于Apache Flink中的有限数据流(本地,没有集群).这是示例:

I'm trying to apply a very simple window function to a finite data stream in Apache Flink (locally, no cluster). Here's the example:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))

  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })

  .print()

env.execute()

在这里,我尝试在一秒钟内将到达窗口的所有元素分组,然后仅打印这些组.

Here, I try to group all the elements that arrive into the window within a second and then just print these groups.

我假设所有元素将在不到一秒钟的时间内生成并进入一个窗口,因此在print()中将有一个传入元素.但是,运行此命令时,什么都不会打印.

I assumed that all the elements would be produces in much less than one second and get into one window, so there will be one incoming element in print(). However, nothing is printed at all when I run this.

如果我删除所有加窗的东西,例如

If I remove all the windowing stuff, like

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .print()

运行后我看到了打印的元素.我也尝试过使用文件源进行此操作,没什么区别.

I see the elements printed after the run. I also tried this with the file source, no difference.

我的计算机上的默认并行度为6.如果我尝试并行度和延迟级别,就像这样

The default parallelism on my machine is 6. If I experiment with the level of parallelism and delays, like this

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .map { x => Thread.sleep(1500); x }

我能够将一些(而不是全部)元素分组,然后打印出来.

I able to get some--not all--elements into groups, which are printed.

我的第一个假设是,源完成的速度比1秒要快得多,并且在窗口的计时器触发之前,任务已关闭.调试显示

My first assumption is that the source finishes much faster than 1 second and the task is shut down before the window's timer fires. Debugging showed that the timer setting line in ProcessingTimeTrigger is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?

能否请您帮助我理解这一点并使其更具确定性?

Could you please help me understand this and make this more deterministic?

更新#1,23.09.2018:

我还尝试了事件时间窗口,而不是处理时间窗口.如果我这样做:

I also experimented with event time windows instead of processing time windows. If I do this:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
    override def extractAscendingTimestamp(element: String): Long = {
      element.charAt(0).toInt
    }
  })

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
  .trigger(EventTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.toString())
    }
  })

  .print()

env.execute()

然后再次不打印任何内容.调试器显示对每个元素调用触发器的onElement,但从不调用onEventTime.

Then again nothing is printed. The debugger shows that the trigger's onElement is called for every element, but onEventTime is never called.

此外,如果我修改时间戳提取器以进行更大的操作:

Also, if I modify the the timestamp extractor to make bigger steps:

element.charAt(0).toInt * 1000

除最后一个元素外,所有元素都被打印出来(每组一个元素,这是预期的).

all the elements are printed (one element per group, which is expected), apart from the last.

更新#2,23.09.2018:

更新#1在推荐答案

当有限源到达末尾时,如果您使用事件时间,则将注入带有时间戳Long.MAX_VALUE的水印,这将导致所有事件时间计时器开火.但是,随着处理时间的推移,Flink将等待所有当前触发计时器完成其操作,然后退出.

When a finite source reaches the end, if you are using event time then a Watermark with timestamp Long.MAX_VALUE will be injected, which will cause all event time timers to fire. However, with processing time, Flink will wait for all currently firing timers to complete their actions, and then exit.

您怀疑,您看不到任何输出,因为源完成非常快.

As you suspected, you're not seeing any output because the source finishes very quickly.

确定性行为在事件时间处理中很简单;处理时间确实无法实现.

Deterministic behavior is straightforward with event time processing; with processing time it's not really achievable.

但这是一个或多或少起作用的黑客:

But here's a hack that more or less works:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val s = env.fromCollection(List("a", "b", "c", "d", "e"))
val t = env.addSource((context: SourceContext[String]) => {
  while(true) {
    Thread.sleep(100)
    context.collect("dummy")
  }
})

s.union(t)
  .filter(_ != "dummy")
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })
  .print()

env.execute()

这篇关于处理时间窗口不适用于Apache Flink中的有限数据源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆