使用Apache Beam进行窗口化 - 固定Windows似乎不会关闭? [英] Windowing with Apache Beam - Fixed Windows Don't Seem to be Closing?

查看:593
本文介绍了使用Apache Beam进行窗口化 - 固定Windows似乎不会关闭?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试在Apache Beam管道上使用固定窗口(使用 DirectRunner )。我们的流程如下:

We are attempting to use fixed windows on an Apache Beam pipeline (using DirectRunner). Our flow is as follows:


  1. 从pub / sub中提取数据

  2. 将JSON反序列化为Java对象

  3. 具有5秒固定窗口的窗口事件

  4. 使用自定义 CombineFn ,组合每个事件的窗口进入列表<事件>

  5. 为了测试,只需输出结果列表<事件>

  1. Pull data from pub/sub
  2. Deserialize JSON into Java object
  3. Window events w/ fixed windows of 5 seconds
  4. Using a custom CombineFn, combine each window of Events into a List<Event>
  5. For the sake of testing, simply output the resulting List<Event>






管道代码:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );






我们看到的结果是最后一步永远不会运行,因为我们没有得到任何记录。


The result we are seeing is that the final step is never run, as we don't get any logging.

另外,我们添加了 System.out.println(*** )在我们的自定义 CombineFn 类的每个方法中,以便跟踪它们何时运行,并且它们似乎也不会运行。

Also, we have added System.out.println("***") in each method of our custom CombineFn class, in order to track when these are run, and it seems they don't run either.

这里的窗口设置不正确吗?我们按照 https://beam.apache.org/documentation/programming-中的示例进行了操作。引导/ #winding 似乎相当简单,但显然有一些根本缺失。

Is windowing set up incorrectly here? We followed an example found at https://beam.apache.org/documentation/programming-guide/#windowing and it seems fairly straightforward, but clearly there is something fundamental missing.

任何见解都表示赞赏 - 提前感谢!

Any insight is appreciated - thanks in advance!

推荐答案

看起来主要问题确实是一个缺失的触发器 - 窗口打开了,没有什么可以告诉它什么时候发出结果。我们只想根据处理时间(而不是事件时间)进行窗口化,因此执行以下操作:

Looks like the main issue was indeed a missing trigger - the window was opening and there was nothing telling it when to emit results. We wanted to simply window based on processing time (not event time) and so did the following:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

本质上,这会创建一个全局窗口,在处理完第一个元素5秒后触发该窗口。每次关闭窗口时,一旦收到元素,另一个窗口就会打开。当我们没有 withAllowedLateness 时,梁抱怨 - 据我所知这只是告诉它忽略任何后期数据。

Essentially this creates a global window, which is triggered to emit events 5 seconds after the first element is processed. Every time the window is closed, another is opened once it receives an element. Beam complained when we didn't have the withAllowedLateness piece - as far as I know this just tells it to ignore any late data.

我的理解可能有点偏僻,但上面的代码解决了我们的问题!

My understanding may be a bit off the mark here, but the above snippet has solved our problem!

这篇关于使用Apache Beam进行窗口化 - 固定Windows似乎不会关闭?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆