使用默认触发器在窗口中使用无界数据 [英] Consuming unbounded data in windows with default trigger

查看:26
本文介绍了使用默认触发器在窗口中使用无界数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Pub/Sub 主题 + 订阅,想要消费和聚合无界数据流中订阅的数据.我使用固定窗口并将聚合写入 BigQuery.

I have a Pub/Sub topic + subscription and want to consume and aggregate the unbounded data from the subscription in a Dataflow. I use a fixed window and write the aggregates to BigQuery.

读写(没有窗口和聚合)工作正常.但是当我将数据通过管道传输到固定窗口(以计算每个窗口中的元素)时,该窗口永远不会 触发.因此不会写入聚合.

Reading and writing (without windowing and aggregation) works fine. But when I pipe the data into a fixed window (to count the elements in each window) the window is never triggered. And thus the aggregates are not written.

这是我的 word 发布者(它使用了 examples 作为输入文件):

Here is my word publisher (it uses kinglear.txt from the examples as input file):

public static class AddCurrentTimestampFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
    }
}

public static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        String[] words = c.element().split("[^a-zA-Z']+");
        for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
    }
}

// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
        .apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
        .apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
        .apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();

这是我的窗口字计数器:

Here is my windowed word counter:

Pipeline p = Pipeline.create(o); // 'o' are the pipeline options

BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
        .withSchema(o.getSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

Window.Bound<String> w = Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)));

p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
        .apply("FixedWindow", w)
        .apply("CountWords", Count.<String>perElement())
        .apply("CreateRows", ParDo.of(new WordCountToRowFn()))
        .apply("WriteRows", tablePipe);
p.run();

上述订阅者将不起作用,因为使用 默认触发器.但是,如果我手动定义触发器,代码将起作用并且计数将写入 BigQuery.

The above subscriber will not work, since the window does not seem to trigger using the default trigger. However, if I manually define a trigger the code works and the counts are written to BigQuery.

Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .triggering(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes();

如果可能,我喜欢避免指定自定义触发器.

I like to avoid specifying custom triggers if possible.

问题:

  1. 为什么我的解决方案不适用于 Dataflow 的默认触发器?
  2. 如何更改发布者或订阅者以使用 默认触发器?

推荐答案

您如何确定触发器永不触发?

How are you determining the trigger never fires?

您的 PubSubIO.WritePubSubIO.Read 转换都应使用 withTimestampLabel 指定时间戳标签,否则您添加的时间戳将不会写入 PubSub,将使用发布时间.

Your PubSubIO.Write and PubSubIO.Read transforms should both specify a timestamp label using withTimestampLabel, otherwise the timestamps you've added will not be written to PubSub and the publish times will be used.

无论哪种方式,管道的输入水印都将从 PubSub 中等待的元素的时间戳中导出.处理完所有输入后,它会停留几分钟(以防发布者出现延迟),然后才会进入实时状态.

Either way, the input watermark of the pipeline will be derived from the timestamps of the elements waiting in PubSub. Once all inputs have been processed, it will stay back for a few minutes (in case there was a delay in the publisher) before advancing to real time.

您可能看到的是,所有元素都发布在同一个约 1 秒的窗口中(因为输入文件非常小).这些都是相对较快的读取和处理,但是它们放入的1秒窗口直到输入水印推进后才会触发,表明该1秒窗口中的所有数据都已被消耗.

What you are likely seeing is that all the elements are published in the same ~1 second window (since the input file is pretty small). These are all read and processed relatively quickly, but the 1-second window they are put in will not trigger until after the input watermark has advanced, indicating that all data in that 1-second window has been consumed.

这种情况在几分钟后才会发生,这可能会让人觉得触发器不起作用.您编写的触发器在 1 秒处理时间后触发,这会更早触发,但不能保证所有数据都已处理.

This won't happen until several minutes, which may make it look like the trigger isn't working. The trigger you wrote fired after 1 second of processing time, which would fire much earlier, but there is no guarantee all the data has been processed.

从默认触发器中获得更好行为的步骤:

Steps to get better behavior from the default trigger:

  1. 在写入和读取发布订阅步骤中使用 withTimestampLabel.
  2. 让发布商进一步分散时间戳(例如,运行几分钟并将时间戳分散在该范围内)

这篇关于使用默认触发器在窗口中使用无界数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆