使用GlobalWindow在Beam中状态垃圾收集 [英] State garbage collection in Beam with GlobalWindow

查看:68
本文介绍了使用GlobalWindow在Beam中状态垃圾收集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Apache Beam最近引入了状态单元格,通过 StateSpec @StateId 批注,在Apache Flink和Google Cloud Dataflow中得到部分支持.

Apache Beam has recently introduced state cells, through StateSpec and the @StateId annotation, with partial support in Apache Flink and Google Cloud Dataflow.

我找不到任何与 GlobalWindow 一起使用时会发生什么情况的文档.特别是,有一种方法可以拥有状态垃圾收集"机制,以根据某些配置摆脱一段时间以来未看到的键的状态,同时仍为所看到的键保持单一的历史状态足够频繁吗?

I cannot find any documentation on what happens when this is used with a GlobalWindow. In particular, is there a way to have a "state garbage collection" mechanism to get rid of states for keys that have not been seen for a while according to some configuration, while still maintaining a single all-time state for keys are that seen frequently enough?

或者,在这种情况下使用的状态量是否会发散,而无法收回与一段时间内未看到的键相对应的状态?

Or, is the amount of state used in this case going to diverge, with no way to ever reclaim state corresponding to keys that have not been seen in a while?

我还对Apache Flink或Google Cloud Dataflow是否支持潜在的解决方案感兴趣.

I am also interested in whether a potential solution would be supported in either Apache Flink or Google Cloud Dataflow.

Flink和直接跑步者似乎有一些用于状态GC"的代码,但是我不确定使用全局窗口时它的作用以及它是否相关.

Flink and direct runners seem to have some code for "state GC" but I am not really sure what it does and whether it is relevant when using a global window.

推荐答案

在窗口到期后的某个时间点,BeamRunner可以自动将状态收集为垃圾-当输入水印超出窗口结尾允许的延迟时间时,则所有其他输入都可以被丢弃.具体细节取决于跑步者.

State can be automatically garbage collected by a Beam runner at some point after a window expires - when the input watermark exceeds the end of the window by the allowed lateness, so all further input is droppable. The exact details depend on the runner.

您已正确确定,全局"窗口可能永远不会过期.然后,将不会调用此状态自动收集.对于包括耗尽情况在内的有限数据,该数据实际上将过期,但是对于永久性无边界数据源,该数据将不会过期.

As you correctly determined, the Global window may never expire. Then this automatic collection of state will not be invoked. For bounded data, including drain scenarios, it actually will expire, but for a perpetual unbounded data source it will not.

如果要在全局"窗口中对此类数据进行状态处理,则可以使用用户定义的计时器(通过 @TimerId @OnTimer 使用)TimerSpec -我还没有写过有关这些的博客),以便在您选择的超时时间后清除状态.如果状态代表某种形式的聚合,那么您将需要一个计时器来确保您的数据不会处于状态中.

If you are doing stateful processing on such data in the Global window you can use user-defined timers (used through @TimerId, @OnTimer, and TimerSpec - I haven't blogged about these yet) to clear state after some timeout of your choosing. If the state represents an aggregation of some sort, then you'll want a timer anyhow to make sure your data is not stranded in state.

以下是其用法的快速示例:

Here is a quick example of their use:

new DoFn<Foo, Baz>() {

  private static final String MY_TIMER = "my-timer";
  private static final String MY_STATE = "my-state";

  @StateId(MY_STATE)
  private final StateSpec<ValueState<Bizzle>> =
      StateSpec.value(Bizzle.coder());

  @TimerId(MY_TIMER)
  private final TimerSpec myTimer =
      TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext c,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState,
      @TimerId(MY_TIMER) Timer myTimer) {
    bizzleState.write(...);
    myTimer.setForNowPlus(...);
  }

  @OnTimer(MY_TIMER)
  public void onMyTimer(
      OnTimerContext context,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
    context.output(... bizzleState.read() ...);
    bizzleState.clear();
  }
}

这篇关于使用GlobalWindow在Beam中状态垃圾收集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆