在GCD上运行的PubsubIO的水印启发式是什么? [英] What is the watermark heuristic for PubsubIO running on GCD?

查看:56
本文介绍了在GCD上运行的PubsubIO的水印启发式是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试运行一个管道,在其中计算以30秒心跳*(10K流,每个心跳每30秒)发布到pubsub的消息之间的差异.我不在乎100%的数据完整性,但是我想了解一下PubsubIO的水印启发式算法(以及是否可以对其进行调整),以确定我是否可以忽略丢失率足够低的后期数据.

Hi I'm trying to run a pipeline where I am calculating diffs between messages that are published to pubsub with 30sec heartbeats* (10K streams, each heartbeating every 30sec). I don't care about 100% data completeness, but I'd like to understand what the watermark heuristic is for PubsubIO (and if I can tweak it), to determine whether I can ignore late data with sufficiently low loss.

*请注意,如果我们不得不取消管道,则pubsub主题可提供[可能需要几天的时间]持久性,因此,启发式方法与积压的订阅一起工作很重要,这一点很重要.

*Note, the pubsub topic provides [potentially days worth of] persistence in case we have to take down the pipeline so it's important that the heuristic work well with a backlogged subscription.

有人可以解释水印的计算方式(假设使用了timestamplabel()),以及如何调整水印吗?

Can someone explain how the watermark is calculated (assuming timestamplabel() is used), and how it can be adjusted, if at all?

推荐答案

以下是我们如何计算PubSub水印的简短说明:

Here's a quick description for how we compute the PubSub watermark:

我们的目标是为通过PubSub发送到流传输管道中的数据构建合理的启发式水印.我们对将数据发送到PubSub的源进行一些假设.具体来说,我们假设原始数据的时间戳是行为良好的",换句话说,在发送到PubSub之前,我们希望源数据上有一定数量的订购时间戳.时间戳超出允许的无序范围发送的任何数据都将被视为后期数据.在我们当前的实现中,此限制为 10秒,这意味着在发送到pubsub之前最多10秒的时间戳重新排序不会创建较晚的数据.我们将此值称为估计范围.这样,构建PubSub水印的问题就减少到确保没有其他数据由于通过PubSub传输而延迟.

Our goal is to build a reasonable heuristic watermark for data sent into our streaming pipeline via PubSub. We make some assumptions about the source which is sending data into PubSub. Specifically, we assume that the timestamps of the original data are "well behaved," in other words, we expect a bounded amount of our-of order timestamps on the source data, before it is sent to PubSub. Any data that is sent with timestamps outside the allowed out-of-order bounds will be considered late data. In our current implementation this bound is 10 seconds meaning reordering of timestamps up to 10 seconds before sending to pubsub will not create late data. We call this value the estimation band. The problem of building a PubSub watermark then reduces to ensuring that no additional data becomes late due to transmission via PubSub.

PubSub面临哪些挑战?由于pubsub不能保证排序,因此我们必须具有某种其他元数据才能充分了解积压.幸运的是,PubSub根据最早的未发布时间戳"提供了积压的量度.这与我们消息的事件时间戳不同,因为PubSub与通过它发送的应用程序级元数据无关,而是这是PubSub接收消息的时间戳.

What are the challenges we face with PubSub? Since pubsub does not guarantee ordering, we must have some kind of additional metadata to know enough about the backlog. Luckily, PubSub provides a measurement of back-log in terms of the "oldest unacked publish timestamp". This is not the same as the event timestamp of our message, since PubSub is agnostic to the application-level metadata being sent through it, instead this is the timestamp of when the message was ingested by PubSub.

虽然此测量听起来像水印,但并不相同.我们不能简单地使用最早的未加确认的发布时间戳作为水印.这些时间戳不等于事件时间戳,并且在发送历史(过去)数据的情况下,它可能任意距离.这些时间戳的排序也可能不同,因为如上所述,我们允许数量有限的重新排序.但是,我们可以将其用作积压的一种方法,以了解有关积压中存在的事件时间戳的足够信息,以便我们可以按如下方式建立合理的水印.

While this measurement sounds similar to a watermark, it is not the same. We cannot simply use the oldest unacked publish timestamp as the watermark. These timestamps are not equal to the event timestamps, and in the case that historical (past) data is being sent, it may be arbitrarily far away. The ordering on these timestamps may also be different, since as mentioned above we allow a limited ammount of re-ordering. However, we can use this as a measure of backlog to learn enough information about the event timestamps present in the backlog so that we can establish a reasonable watermark as follows.

我们将数据到达的订阅称为基本订阅.看一下我们的基本订阅,我们看到消息可能会乱序到达.我们使用发布订阅发布时间戳"pt"和事件时间时间戳"et"来标记每条消息.请注意,两个时域可以不相关

We call the subscription on which data is arriving the base subscription. Taking a look at our base subscription, we see that messages may arrive out of order. We label each message with its pubsub publish timestamp "pt" and its event time timestamp "et". Note that the two time domains can be unrelated

一些未确认的基本订阅消息形成了积压.这可能是由于尚未交付,或者可能已经交付但尚未处理.还请记住,来自此订阅的拉取分布在多个分片中.因此,仅通过查看基本订阅就不可能说出我们的水印应该是什么.

Some messages on the base subscription are un-acknowledged forming a backlog. This may be due to them not yet being delivered, or they may have been delivered, but not yet processed. Remember also that pulls from this subscription are distributed across multiple shards. Thus it is not possible to say just by looking at the base subscription what our watermark should be.

我们通过创建第二个仅元数据的 tracking订阅进行操作,该跟踪订阅用于有效检查基本订阅的积压,并在积压中获取最少的事件时间戳.通过在跟踪订阅中保留很少或没有待办事项,我们可以检查基本订阅最早出现的邮件之前的邮件.

We proceed by creating a second metadata-only tracking subscription, which is used to effectively inspect the backlog of the base subscription, and take the minimum of the event timestamps in the backlog. By maintaining little or no backlog on the tracking subscription we can inspect the messages ahead of the base subsciption’s oldest unak’d.

通过确保从此订阅中进行提取在计算上不昂贵,我们可以跟踪跟踪订阅.相反,如果我们在跟踪订阅方面远远落后,我们将停止推进水印.为此,我们确保至少满足以下条件之一:

We stay caught up on the tracking subscription by ensuring that pulling from this subscription is computationally inexpensive. Conversely, if we fall sufficiently behind on the tracking subscription, we will stop advancing the watermark. To do so, we ensure that at least one of the following conditions is met:

  • 跟踪订阅要远远领先于基本订阅,这意味着跟踪订阅至少要领先估计频带.这样可以确保将估计范围内的任何有界重排序都考虑在内.
  • 跟踪订阅已足够接近实时.换句话说,跟踪订阅上没有积压的订单.

一旦我们持久地保留了有关消息的发布和事件时间戳记的元数据,我们将尽快在跟踪订阅上确认消息.我们将这种元数据以稀疏的直方图格式存储,以最大程度地减少使用的空间量和持久写入的大小.

We ack the messages on the tracking subscription as soon as possible, once we have durably persisted metadata about the publish and event timestamps of the messages. We store this metadata in a sparse histogram format to minimize the amount of space used, and the size of the durable writes.

最后,我们确保我们有足够的数据来进行合理的水印估计.我们采用一系列事件时间戳,其中发布时间戳在范围内

Finally, we ensure that we have enough data to make a reasonable watermark estimate. We take a band of event timestamps with publish timestamps in the range

 [ min ( base sub oldest unack'd, tracking sub oldest unack'd - 10 sec) , 
         tracking sub oldest unack'd ]

这可以确保我们考虑积压中的所有事件时间戳,或者如果积压很小,则考虑最近的估计范围,以进行水印估计.

This ensures that we consider all event timestamps in the backlog, or if the backlog is small, the most recent estimation band, to make a watermark estimate.

最后,将水印值计算为该频带中的最小事件时间.

Finally the watermark value is computed to be the minimum event time in the band.

还要注意,此方法是正确的,但是会产生过于保守的水印.由于我们认为所有邮件都比基本订阅在跟踪订阅上的最旧取消消息早,因此我们可以在水印估计中为已确认的邮件添加事件时间戳.

Observe also that this method is correct, but produces an overly conservative watermark. Since we consider all messages ahead of the base subscription’s oldest unak’d on the tracking subscription, we may include event timestamps in the watermark estimate for messages that have already been acknowledged.

此外,还有一些启发式方法可以确保进度.在密集,频繁到达的数据的情况下,上述方法效果很好.在数据稀疏或不频繁的情况下,可能没有足够的最新消息来建立合理的估计.如果我们在不到两分钟的时间内没有看到订阅数据(没有积压),我们会将水印提前到接近实时的水平.这样可以确保即使再也没有消息出现,水印和管道也会继续取得进展.

Additionally, there are a few heuristics to ensure progress. The above method works well in the case of dense, frequently arriving data. In the case of sparse or infrequent data, there may not be enough recent messages to build a reasonable estimate. In the case that we have not seen data on the subscription in more than two minutes (and there's no backlog), we advance the watermark to near real time. This ensures that the watermark and the pipeline continue to make progress even if no more messages are forthcoming.

以上所有内容确保只要源数据事件时间戳重新排序在估计范围内,就不会有其他延迟数据.

All of the above ensures that as long as source data event timestamp re-ordering is within the estimation band, there will be no additional late data.

这篇关于在GCD上运行的PubsubIO的水印启发式是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆