使用默认触发器在Windows中使用无边界数据 [英] Consuming unbounded data in windows with default trigger

查看:104
本文介绍了使用默认触发器在Windows中使用无边界数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个发布/订阅主题+订阅,并且想使用和汇总无边界订阅中的数据流中的数据.我使用固定的窗口并将聚合写入BigQuery.

I have a Pub/Sub topic + subscription and want to consume and aggregate the unbounded data from the subscription in a Dataflow. I use a fixed window and write the aggregates to BigQuery.

读取和写入(没有窗口和聚合)可以正常工作.但是,当我将数据通过管道传输到固定窗口(以计算每个窗口中的元素)时,该窗口永远不会触发.因此,聚合不被写入.

Reading and writing (without windowing and aggregation) works fine. But when I pipe the data into a fixed window (to count the elements in each window) the window is never triggered. And thus the aggregates are not written.

这是我的文字发布者(它使用示例作为输入文件):

Here is my word publisher (it uses kinglear.txt from the examples as input file):

public static class AddCurrentTimestampFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
    }
}

public static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        String[] words = c.element().split("[^a-zA-Z']+");
        for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
    }
}

// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
        .apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
        .apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
        .apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();

这是我的窗口式文字计数器:

Here is my windowed word counter:

Pipeline p = Pipeline.create(o); // 'o' are the pipeline options

BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
        .withSchema(o.getSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

Window.Bound<String> w = Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)));

p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
        .apply("FixedWindow", w)
        .apply("CountWords", Count.<String>perElement())
        .apply("CreateRows", ParDo.of(new WordCountToRowFn()))
        .apply("WriteRows", tablePipe);
p.run();

上述订阅者将无法使用,因为该窗口似乎无法使用

The above subscriber will not work, since the window does not seem to trigger using the default trigger. However, if I manually define a trigger the code works and the counts are written to BigQuery.

Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .triggering(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes();

我希望避免在可能的情况下指定自定义触发器.

I like to avoid specifying custom triggers if possible.

问题:

  1. 为什么我的解决方案不能与Dataflow的默认触发器一起使用?
  2. 如何使用默认触发器?
  1. Why does my solution not work with Dataflow's default trigger?
  2. How do I have to change my publisher or subscriber to trigger windows using the default trigger?

推荐答案

您如何确定触发器永不触发?

How are you determining the trigger never fires?

您的PubSubIO.WritePubSubIO.Read转换都应使用withTimestampLabel指定时间戳标签,否则您添加的时间戳将不会写入PubSub,并且将使用发布时间.

Your PubSubIO.Write and PubSubIO.Read transforms should both specify a timestamp label using withTimestampLabel, otherwise the timestamps you've added will not be written to PubSub and the publish times will be used.

无论哪种方式,流水线的输入水印都将从在PubSub中等待的元素的时间戳导出.处理完所有输入后,它将保留几分钟(以防发布者延迟),然后再进行实时处理.

Either way, the input watermark of the pipeline will be derived from the timestamps of the elements waiting in PubSub. Once all inputs have been processed, it will stay back for a few minutes (in case there was a delay in the publisher) before advancing to real time.

您可能会看到,所有元素都在相同的〜1秒窗口中发布(因为输入文件非常小).这些都是相对较快地被读取和处理的,但是它们放入的1秒窗口直到输入水印超前才触发,这表明该1秒窗口中的所有数据都已消耗.

What you are likely seeing is that all the elements are published in the same ~1 second window (since the input file is pretty small). These are all read and processed relatively quickly, but the 1-second window they are put in will not trigger until after the input watermark has advanced, indicating that all data in that 1-second window has been consumed.

只有几分钟后才会发生,这可能会使触发器看起来不起作用.您编写的触发器将在处理时间1秒后触发,这会更早地触发,但不能保证所有数据都已被处理.

This won't happen until several minutes, which may make it look like the trigger isn't working. The trigger you wrote fired after 1 second of processing time, which would fire much earlier, but there is no guarantee all the data has been processed.

从默认触发器中获得更好行为的步骤:

Steps to get better behavior from the default trigger:

  1. 在写入和读取pubsub步骤上均使用withTimestampLabel.
  2. 让发布者进一步扩展时间戳(例如,运行几分钟,然后在该范围内扩展时间戳)

这篇关于使用默认触发器在Windows中使用无边界数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆