JdbcIO.Write.withResults和Wait.on与具有FixedWindow的无界PCollection [英] JdbcIO.Write.withResults and Wait.on with an unbounded PCollection with FixedWindow

查看:83
本文介绍了JdbcIO.Write.withResults和Wait.on与具有FixedWindow的无界PCollection的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现与 BEAM中提到的情况不同-6732 ,我的源是发布/订阅订阅,而不是使用Wait.on写入另一个表,而是尝试使用它来确定何时完成写入,生成消息并路由到发布/订阅主题.

I am trying to implement a pipeline that is similar to the one outlined in this question, but unlike the situation mentioned in BEAM-6732, my source is a Pub/Sub subscription, and instead of using the Wait.on to write to another table, I am trying to use it to determine when the writes are complete, generate a message and route to a Pub/Sub topic.

我尝试使用默认窗口,但是根据Wait.on的文档,它不适用于无限制的集合,尝试手动定义一个固定的窗口,延迟时间更短,但这似乎也不起作用,请在下面找到使用的窗口. JDBCIO.write之后的步骤似乎总是停滞不前,即等待步骤没有输出.

I tried using the default window, but based on the documentation for Wait.on, it does not work for unbounded collections, tried manually defining a fixed window, with a lower allowed lateness, but that also does not seem to work, please find the window used below. The steps after the JDBCIO.write seems to be always stuck, i.e there is no output from the wait step.

Window.into(FixedWindows.of(Duration.standardSeconds(10)))
    .triggering(
        Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))
                .orFinally(AfterWatermark.pastEndOfWindow())
        )
    ).withAllowedLateness(Duration.standardMinutes(2)).discardingFiredPanes();

在可能存在问题的方面寻求建议,以及对发布/订阅源使用较低的allowedLateness会产生什么影响,这不能保证订购.

Looking for advise on what could be wrong, also what the impact would be of using a low allowedLateness for a Pub/Sub source, which does not guarantee ordering.

推荐答案

似乎Wait可以用于无限制的源,因为它可以应用于Windows.在等待SDK文档我们发现:

It seems that Wait can be used for unbounded sources since it can be applied to on Windows. In Wait SDK documentation we find:

"返回内容与输入相同的PCollection,但是 延迟在窗口W中产生输出的元素,直到信号的 W窗口关闭(信号的水印通过W.end + signal.allowedLateness)"

"returns a PCollection with contents identical to the input, but delays producing elements of the output in window W until the signal's window W closes (signal's watermark passes W.end + signal.allowedLateness)"

让我们看一下解释"apply T to X after Y is ready" -> X.apply(Wait.on(Y)).apply(T).如果我理解正确,我们可以根据您的使用情况调整以下内容:

Let's look at the explanation "apply T to X after Y is ready" -> X.apply(Wait.on(Y)).apply(T). If I understand correctly, we can adapt the following to your use case:

  • T =生成消息
  • Y = JdbcIO.Write.withResults
  • X =发射到PubSub

尽管您的代码段中可能需要FixedWindow,但我认为不需要触发,因为Wait已经在考虑Window的末尾(信号的水印通过W.end + signal.allowedLateness传递).此外,如果allowLateness的值较低,则窗口结束后,窗口将尽快关闭.

While FixedWindow might be required in your code snippet, I think triggering won't be needed since Wait is already considering the end of the Window (signal's watermark passes W.end + signal.allowedLateness). Additionally, with a low value for allowedLateness, the Window will be closed sooner, as soon as the Window ends.

这篇关于JdbcIO.Write.withResults和Wait.on与具有FixedWindow的无界PCollection的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆