Apache Beam Stateful DoFn 定期输出所有 K/V 对 [英] Apache Beam Stateful DoFn Periodically Output All K/V Pairs

查看:17
本文介绍了Apache Beam Stateful DoFn 定期输出所有 K/V 对的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用有状态的 DoFn(使用 @ProcessElement@StateId)在 Apache Beam(通过 Scio)中聚合(每个键)一个流数据源code>ValueState 元素).我认为这最适合我要解决的问题.要求是:

I'm trying to aggregate (per key) a streaming data source in Apache Beam (via Scio) using a stateful DoFn (using @ProcessElement with @StateId ValueState elements). I thought this would be most appropriate for the problem I'm trying to solve. The requirements are:

  • 对于给定的键,记录是在所有时间聚合(基本上是求和) - 我不关心以前计算的聚合,只关心最近的聚合
  • 可能会根据我控制的某些条件从状态 (state.clear()) 中驱逐
  • 每 5 分钟,无论是否看到任何新密钥,都应该输出所有 尚未从状态中驱逐的密钥
  • for a given key, records are aggregated (essentially summed) across all time - I don't care about previously computed aggregates, just the most recent
  • keys may be evicted from the state (state.clear()) based on certain conditions that I control
  • Every 5 minutes, regardless if any new keys were seen, all keys that haven't been evicted from the state should be outputted

鉴于这是一个流式管道并且将无限期运行,在具有累积触发窗格的全局窗口上使用 combinePerKey 似乎会继续增加其内存占用量和它的数据量需要随着时间的推移运行,所以我想避免它.此外,在对此进行测试时,(可能正如预期的那样)它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值.

Given that this is a streaming pipeline and will be running indefinitely, using a combinePerKey over a global window with accumulating fired panes seems like it will continue to increase its memory footprint and the amount of data it needs to run over time, so I'd like to avoid it. Additionally, when testing this out, (maybe as expected) it simply appends the newly computed aggregates to the output along with the historical input, rather than using the latest value for each key.

我的想法是使用 StatefulDoFn 只会让我输出直到 now() 的所有全局状态,但这似乎不是一个简单的解决方案.我已经看到使用计时器人为执行回调的暗示,以及可能使用缓慢增长的侧输入映射(当我创建 PCollectionView) 并以某种方式刷新它时如何解决重复值异常,但这基本上需要迭代覆盖地图中的所有值,而不是加入它.

My thought was that using a StatefulDoFn would simply allow me to output all of the global state up until now(), but it seems this isn't a trivial solution. I've seen hintings at using timers to artificially execute callbacks for this, as well as potentially using a slowly growing side input map (How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>) and somehow flushing this, but this would essentially require iterating over all values in the map rather than joining on it.

我觉得我可能忽略了一些简单的东西来实现这个功能.我对 Beam 中的窗口和计时器的许多概念相对较新,正在寻找有关如何解决此问题的任何建议.谢谢!

I feel like I might be overlooking something simple to get this working. I'm relatively new to many concepts of windowing and timers in Beam, looking for any advice on how to solve this. Thanks!

推荐答案

你说得对,Stateful DoFn 应该在这里帮助你.这是您可以做什么的基本草图.请注意,这仅输出没有密钥的总和.它可能不完全是您想要的,但它应该可以帮助您前进.

You are right that Stateful DoFn should help you here. This is a basic sketch of what you can do. Note that this only outputs the sum without the key. It may not be exactly what you want, but it should help you move forward.

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception {
        if (SOME CONDITION) {
          countValueState.clear();
          doneState.write(true);
        } else {
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        }
      }
    }

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) {
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) {
        return;
      } else {
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      }
    }
  }
  }

很高兴与您一起反复研究可行的方法.

Happy to iterate with you on something that'll work.

这篇关于Apache Beam Stateful DoFn 定期输出所有 K/V 对的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆