处理时间窗口不适用于 Apache Flink 中的有限数据源 [英] Processing time windows doesn't work on finite data sources in Apache Flink

查看:23
本文介绍了处理时间窗口不适用于 Apache Flink 中的有限数据源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将一个非常简单的窗口函数应用于 Apache Flink 中的有限数据流(本地,无集群).示例如下:

I'm trying to apply a very simple window function to a finite data stream in Apache Flink (locally, no cluster). Here's the example:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))

  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })

  .print()

env.execute()

在这里,我尝试将在一秒钟内到达窗口的所有元素分组,然后只打印这些组.

Here, I try to group all the elements that arrive into the window within a second and then just print these groups.

我假设所有元素都将在不到一秒的时间内生成并进入一个窗口,因此 print() 中将有一个传入元素.但是,当我运行它时什么都没有打印.

I assumed that all the elements would be produces in much less than one second and get into one window, so there will be one incoming element in print(). However, nothing is printed at all when I run this.

如果我删除所有窗口的东西,比如

If I remove all the windowing stuff, like

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .print()

我看到运行后打印的元素.我也用文件源试过这个,没有区别.

I see the elements printed after the run. I also tried this with the file source, no difference.

我机器上的默认并行度是 6.如果我试验并行度和延迟级别,就像这样

The default parallelism on my machine is 6. If I experiment with the level of parallelism and delays, like this

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .map { x => Thread.sleep(1500); x }

我能够将一些——不是全部——元素分组,然后打印出来.

I able to get some--not all--elements into groups, which are printed.

我的第一个假设是源的完成速度远快于 1 秒,并且任务在窗口的计时器触发之前关闭.调试显示 到达ProcessingTimeTrigger中的定时器设置行.不应该在任务关闭之前所有启动的计时器都完成(至少这是我从 代码)?

My first assumption is that the source finishes much faster than 1 second and the task is shut down before the window's timer fires. Debugging showed that the timer setting line in ProcessingTimeTrigger is reached. Shouldn't all started timers finish before a task shuts down (at least this is the impression I got from the code)?

能否请您帮助我理解这一点并使其更具确定性?

Could you please help me understand this and make this more deterministic?

更新 #1,23.09.2018:

我还试验了事件时间窗口而不是处理时间窗口.如果我这样做:

I also experimented with event time windows instead of processing time windows. If I do this:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
    override def extractAscendingTimestamp(element: String): Long = {
      element.charAt(0).toInt
    }
  })

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
  .trigger(EventTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.toString())
    }
  })

  .print()

env.execute()

然后再次没有打印任何内容.调试器显示为每个元素调用触发器的 onElement,但从不调用 onEventTime.

Then again nothing is printed. The debugger shows that the trigger's onElement is called for every element, but onEventTime is never called.

此外,如果我修改时间戳提取器以进行更大的步骤:

Also, if I modify the the timestamp extractor to make bigger steps:

element.charAt(0).toInt * 1000

打印所有元素(每组一个元素,这是预期的),除了最后一个.

all the elements are printed (one element per group, which is expected), apart from the last.

更新 #2,23.09.2018:

更新 #1 在 此评论.

推荐答案

当有限源到达末尾时,如果您使用的是事件时间,则会注入时间戳为 Long.MAX_VALUE 的 Watermark,这将导致所有事件时间计时器开火.但是,随着处理时间的增加,Flink 将等待所有当前正在触发的计时器完成其操作,然后退出.

When a finite source reaches the end, if you are using event time then a Watermark with timestamp Long.MAX_VALUE will be injected, which will cause all event time timers to fire. However, with processing time, Flink will wait for all currently firing timers to complete their actions, and then exit.

正如您所怀疑的,您没有看到任何输出,因为源代码完成得非常快.

As you suspected, you're not seeing any output because the source finishes very quickly.

事件时间处理的确定性行为很简单;处理时间太长,这是不可能实现的.

Deterministic behavior is straightforward with event time processing; with processing time it's not really achievable.

但这里有一个或多或少有效的黑客:

But here's a hack that more or less works:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val s = env.fromCollection(List("a", "b", "c", "d", "e"))
val t = env.addSource((context: SourceContext[String]) => {
  while(true) {
    Thread.sleep(100)
    context.collect("dummy")
  }
})

s.union(t)
  .filter(_ != "dummy")
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })
  .print()

env.execute()

这篇关于处理时间窗口不适用于 Apache Flink 中的有限数据源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆