使用 Apache Beam 开窗 - 固定的 Windows 似乎没有关闭? [英] Windowing with Apache Beam - Fixed Windows Don't Seem to be Closing?

查看:19
本文介绍了使用 Apache Beam 开窗 - 固定的 Windows 似乎没有关闭?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试在 Apache Beam 管道上使用固定窗口(使用 DirectRunner).我们的流程如下:

We are attempting to use fixed windows on an Apache Beam pipeline (using DirectRunner). Our flow is as follows:

  1. 从发布/订阅中提取数据
  2. 将 JSON 反序列化为 Java 对象
  3. 具有 5 秒固定窗口的窗口事件
  4. 使用自定义的CombineFn,将Event的每个窗口组合成一个List
  5. 为了测试,只需输出结果List
  1. Pull data from pub/sub
  2. Deserialize JSON into Java object
  3. Window events w/ fixed windows of 5 seconds
  4. Using a custom CombineFn, combine each window of Events into a List<Event>
  5. For the sake of testing, simply output the resulting List<Event>

<小时>

管道代码:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

<小时>

我们看到的结果是最后一步永远不会运行,因为我们没有得到任何日志记录.


The result we are seeing is that the final step is never run, as we don't get any logging.

此外,我们在自定义 CombineFn 类的每个方法中添加了 System.out.println("***"),以便跟踪这些跑,看来他们也不跑.

Also, we have added System.out.println("***") in each method of our custom CombineFn class, in order to track when these are run, and it seems they don't run either.

这里的窗口设置不正确吗?我们遵循了在 https://beam.apache.org/documentation/programming-guide/#windowing 看起来相当简单,但显然缺少一些基本的东西.

Is windowing set up incorrectly here? We followed an example found at https://beam.apache.org/documentation/programming-guide/#windowing and it seems fairly straightforward, but clearly there is something fundamental missing.

感谢任何见解 - 提前致谢!

Any insight is appreciated - thanks in advance!

推荐答案

看起来主要问题确实是缺少触发器 - 窗口正在打开并且没有任何东西告诉它何时发出结果.我们想简单地基于处理时间(而不是事件时间)进行窗口化,因此做了以下操作:

Looks like the main issue was indeed a missing trigger - the window was opening and there was nothing telling it when to emit results. We wanted to simply window based on processing time (not event time) and so did the following:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

本质上这会创建一个全局窗口,在处理第一个元素后 5 秒触发该窗口发出事件.每次关闭窗口时,一旦接收到一个元素,另一个窗口就会打开.当我们没有 withAllowedLateness 片段时,Beam 抱怨——据我所知,这只是告诉它忽略任何迟到的数据.

Essentially this creates a global window, which is triggered to emit events 5 seconds after the first element is processed. Every time the window is closed, another is opened once it receives an element. Beam complained when we didn't have the withAllowedLateness piece - as far as I know this just tells it to ignore any late data.

我的理解在这里可能有点离题,但上面的代码片段已经解决了我们的问题!

My understanding may be a bit off the mark here, but the above snippet has solved our problem!

这篇关于使用 Apache Beam 开窗 - 固定的 Windows 似乎没有关闭?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆