检测键状态变化 [英] Detecting keyed state changes

查看:65
本文介绍了检测键状态变化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Dataflow编程模型的新手,在我认为应该是一个简单用例的问题上遇到了一些麻烦:

I'm new to the Dataflow programming model and have some trouble wrapping my mind around what I think should be a simple use case:

我有一条管道从Pub/Sub读取实时数据,该数据包含带有(简化)序列号和状态(UP或DOWN)的设备状态.保证设备至少每5分钟发送一次状态,但是当然设备可以多次发送相同状态.

I have a pipeline reading live data from Pub/Sub, this data contains device statuses with (simplified) a serial number and a state (UP or DOWN). A device is guaranteed to send its state at least every 5 minutes, but then of course a device may send the same state multiple times.

我想要实现的是仅发送设备状态变化的管道,因此基本上跟踪给定键的每个键的最后状态"的概念,并与之比较新事件.

What I'm trying to achieve is a pipeline that only emits state changes for a device, so basically keeping track of some notion of "last state per key" for a given key and comparing new events to that.

此刻有什么好方法吗?

推荐答案

"在窗口触发器/触发中删除重复项有一个相关问题 a>",但您的问题提出了一些不同的细微之处.因此,让我分别解决两个方面的问题,并将某些部分引至链接的问题.

There is a related question at "Remove duplicates across window triggers/firings" but your question brings up some subtleties that differ. So let me address two aspects separately and refer some parts to the linked question.

1.取得最新的输入值

您的问题在这里有所不同,因为它显然没有输出关联&的结果.交换Combine操作.这很重要,因为在Dataflow& Beam,输入不是有序的,它只是带有时间戳,以便我们可以在事件发生时对其进行推理.

Your question differs here because it is not obviously outputting the result of an associative & commutative Combine operation. This is important because in Dataflow & Beam, the input is not ordered - it just carries timestamps so that we can reason about it in event time.

在( timestamp UP/DOWN)对上,您可以定义关联&交换操作,它只占用时间戳的最大值,并随其携带状态.在两个相等的时间戳记的情况下,您必须做出任意选择,但这听起来像您不希望遇到这种情况.

Over pairs of (timestamp, UP/DOWN) you can define an associative & commutative operation that just takes the maximum of the timestamp, and carries the state with it. You'll have to make an arbitrary choice in the case of two equal timestamps, but it sounds like you don't expect to encounter this situation.

为了自然表达您的需求,我们需要一个功能,其中GroupByKey还会对每个键(和窗口)的值进行次要排序.在这种情况下,您可以按时间戳进行排序,但是该功能相当笼统,我们知道用例.

In order to express your desires naturally, we would need a feature whereby GroupByKey also does a secondary sort of your values per key (and window). In this case, you would sort by timestamp, but the feature is pretty general and we are aware of the use case.

这将能够表达您的逻辑获取最新价值"部分.

That will get as far as being able to express the "take the latest value" part of your logic.

2.仅在结果更改后才产生输出

此方面直接对应于链接的问题.您的问题有所不同,即使定义了关联&交换运算中,您缺少规范的标识元素.在这里的答案中,过滤出身份元素是逼近增量输出的关键.

This aspect corresponds directly to the linked question. Your question is different in that even having defined an associative & commutative operation, you lack a canonical identity element. In the answer there, filtering out of the identity element was key to approximating incremental output.

您可以提出用于编码是否需要更改的方案,例如将累加器类型扩展为( timestamp CHANGE/NO_CHANGEUP/DOWN)从NO_CHANGECHANGE的单调过渡的可能性.但这仅在您具有标记为NO_CHANGE的身份元素时才真正有用.如果在UPDOWN之间任意选择,则只能将数据量减少一半.

You could come up with schemes for encoding whether or not a change is necessary, such as expanding your accumulator type to tuples of (timestamp, CHANGE/NO_CHANGE, UP/DOWN) where there is the possibility of a monotonic transition from NO_CHANGE to CHANGE. But this only really helps if you have an identity element tagged with NO_CHANGE. And given an arbitrary choice between UP and DOWN it can only reduce data volume by half.

在您的情况下,结论实际上是不是仅当组合结果发生更改时才输出"的直接表达,但是我强烈建议正确的方法是自己管理状态机使用Apache Beam中可用的状态处理功能,这将成为Dataflow 2.x的基础.

In your case, the conclusion is actually not the direct expression of "output only when the combined result has changed" but I would more strongly suggest that the right approach is to manage the state machine yourself using the stateful processing features available in Apache Beam, which will be the basis for Dataflow 2.x.

有状态的DoFn代码可能看起来像这样:

The stateful DoFn code might look something like this:

new DoFn<KV<DeviceId, UpDown>, KV<DeviceId, UpDown>>() {

  @StateId("latestTimestamp")
  private static final StateSpec<Object, ValueState<Instant>> latestTimestampSpec =
      StateSpecs.value(InstantCoder.of());

  @StateId("latestOutput")
  private static final StateSpec<Object, ValueState<UpDown>> latestOutputSpec =
      StateSpecs.value(UpDown.getCoder());

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("latestTimestamp") latestTimestampState,
      @StateId("latestOutput") latestOutputState) {

    Instant latestTimestamp = latestTimestampState.read();
    UpDown latestOutput = latestOutputState.read();
    Instant newTimestamp = c.element().timestamp();
    UpDown newValue = c.element().getValue();

    if (newTimestamp.isAfter(latestTimestamp)
        && !newValue.equals(latestOutput)) {
      c.output(KV.of(c.element().getKey(), newValue));
      latestTimestampState.write(newTimestamp);
      latestOutputState.write(newValue);
    }
  }
}

这个和链接的问题都是我在

This and the linked question are both inspirations for the example I used in this blog post on the Beam blog. So you might read up there for more details.

这篇关于检测键状态变化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆