Apache Beam有状态DoFn定期输出所有K/V对 [英] Apache Beam Stateful DoFn Periodically Output All K/V Pairs

查看:61
本文介绍了Apache Beam有状态DoFn定期输出所有K/V对的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用有状态DoFn(通过 @ProcessElement @StateId ValueState 元素).我认为这对于我要解决的问题最合适.要求是:

I'm trying to aggregate (per key) a streaming data source in Apache Beam (via Scio) using a stateful DoFn (using @ProcessElement with @StateId ValueState elements). I thought this would be most appropriate for the problem I'm trying to solve. The requirements are:

  • 对于给定的键,记录会一直进行汇总(基本上是汇总)-我不在乎以前计算的汇总,只是最近的
  • 根据我控制的某些条件,
  • 键可能会从状态( state.clear())中被逐出
  • 每5分钟无论是否看到任何新密钥,都应输出所有尚未从状态撤出的密钥
  • for a given key, records are aggregated (essentially summed) across all time - I don't care about previously computed aggregates, just the most recent
  • keys may be evicted from the state (state.clear()) based on certain conditions that I control
  • Every 5 minutes, regardless if any new keys were seen, all keys that haven't been evicted from the state should be outputted

考虑到这是一个流传输管道,它将无限期地运行,因此可以在全局窗口上使用 combinePerKey 来累积触发窗格,这似乎将继续增加其内存占用量和数据量需要随着时间的流逝,所以我想避免这种情况.此外,在进行测试时(可能符合预期),它只是将新计算的聚合与历史输入一起添加到输出中,而不是对每个键使用最新值.

Given that this is a streaming pipeline and will be running indefinitely, using a combinePerKey over a global window with accumulating fired panes seems like it will continue to increase its memory footprint and the amount of data it needs to run over time, so I'd like to avoid it. Additionally, when testing this out, (maybe as expected) it simply appends the newly computed aggregates to the output along with the historical input, rather than using the latest value for each key.

我的想法是,使用StatefulDoFn只会让我输出直到now()之前的所有全局状态,但这似乎不是一个简单的解决方案.我已经看到了使用计时器人为地为此执行回调的暗示,以及可能使用缓慢增长的侧面输入映射的提示(

My thought was that using a StatefulDoFn would simply allow me to output all of the global state up until now(), but it seems this isn't a trivial solution. I've seen hintings at using timers to artificially execute callbacks for this, as well as potentially using a slowly growing side input map (How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>) and somehow flushing this, but this would essentially require iterating over all values in the map rather than joining on it.

我觉得我可能忽略了一些简单的事情才能使它正常工作.我对Beam中的窗口和计时器的许多概念还比较陌生,希望就如何解决这个问题寻求任何建议.谢谢!

I feel like I might be overlooking something simple to get this working. I'm relatively new to many concepts of windowing and timers in Beam, looking for any advice on how to solve this. Thanks!

推荐答案

您说对了,有状态DoFn应该在这里为您提供帮助.这是您可以做什么的基本草图.请注意,这仅输出不带键的总和.可能不是您想要的,但这应该可以帮助您前进.

You are right that Stateful DoFn should help you here. This is a basic sketch of what you can do. Note that this only outputs the sum without the key. It may not be exactly what you want, but it should help you move forward.

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception {
        if (SOME CONDITION) {
          countValueState.clear();
          doneState.write(true);
        } else {
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        }
      }
    }

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) {
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) {
        return;
      } else {
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      }
    }
  }
  }

很高兴能与您进行迭代以解决问题.

Happy to iterate with you on something that'll work.

这篇关于Apache Beam有状态DoFn定期输出所有K/V对的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆