将计时器设置为所见的最小时间戳 [英] Setting a Timer to the minimum timestamp seen

查看:39
本文介绍了将计时器设置为所见的最小时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在事件时间中设置一个计时器,该计时器根据在我的 DoFn 中的元素中看到的最小时间戳触发.

I would like to set a Timer in Event time that fires based on the smallest timestamp seen in the elements within my DoFn.

推荐答案

出于性能原因,Timer API 不支持 read() 操作,对于绝大多数用例来说,这不是必需的功能.在需要它的一小部分用例中,例如,当您需要根据在 DoFn 中的元素中看到的最小时间戳在 EventTime 中设置 Timer 时,我们可以使用 State 对象来跟踪值.

For performance reasons the Timer API does not support a read() operation, which for the vast majority of use cases is not a required feature. In the small set of use cases where it is needed, for example when you need to set a Timer in EventTime based on the smallest timestamp seen in the elements within a DoFn, we can make use of a State object to keep track of the value.

Java (SDK 2.10.0)

Java (SDK 2.10.0)

    // In this pattern, a Timer is set to fire based on the lowest timestamp seen in the DoFn. 
public class SetEventTimeTimerBasedOnEarliestElementTime {

  private static final Logger LOG = LoggerFactory
      .getLogger(SetEventTimeTimerBasedOnEarliestElementTime.class);

  public static void main(String[] args) {

    // Create pipeline
    PipelineOptions options = PipelineOptionsFactory.
        fromArgs(args).withValidation().as(PipelineOptions.class);

    // We will start our timer at a fixed point
    Instant now = Instant.parse("2000-01-01T00:00:00Z");

    // ----- Create some dummy data

    // Create 3 elements, incrementing by 1 minute
    TimestampedValue<KV<String, Integer>> time_1 = TimestampedValue.of(KV.of("Key_A", 1), now);

    TimestampedValue<KV<String, Integer>> time_2 = TimestampedValue
        .of(KV.of("Key_A", 2), now.plus(Duration.standardMinutes(1)));

    TimestampedValue<KV<String, Integer>> time_3 = TimestampedValue
        .of(KV.of("Key_A", 3), now.plus(Duration.standardMinutes(2)));

    Pipeline p = Pipeline.create(options);

    // Apply a fixed window of duration 10 min and Sum the results
    p.apply(Create.timestamped(time_3, time_2, time_1)).apply(
        Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(10))))
        .apply(ParDo.of(new StatefulDoFnThatSetTimerBasedOnSmallestTimeStamp()));

    p.run();
  }

  /**
   * Set timer to the lowest value that we see in the stateful DoFn
   */
  public static class StatefulDoFnThatSetTimerBasedOnSmallestTimeStamp
      extends DoFn<KV<String, Integer>, KV<String, Integer>> {

    // Due to performance considerations there is no read on a timer object.
    // We make use of this Long value to keep track.
    @StateId("currentTimerValue") private final StateSpec<ValueState<Long>> currentTimerValue =
        StateSpecs.value(BigEndianLongCoder.of());

    @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement public void process(ProcessContext c,
        @StateId("currentTimerValue") ValueState<Long> currentTimerValue,
        @TimerId("timer") Timer timer) {

      Instant timeStampWeWantToSet = c.timestamp();

      //*********** Set Timer

      // If the timer has never been set then we set it.
      // If the timer has been set but is larger than our current value then we set it.
      if (currentTimerValue.read() == null || timeStampWeWantToSet.getMillis() < currentTimerValue
          .read()) {

        timer.set(timeStampWeWantToSet);
        currentTimerValue.write(timeStampWeWantToSet.getMillis());
      }

    }

    @OnTimer("timer") public void onMinTimer(OnTimerContext otc,
        @StateId("currentTimerValue") ValueState<Long> currentTimerValue,
        @TimerId("timer") Timer timer) {

      // Reset the currentTimerValue
      currentTimerValue.clear();

      LOG.info("Timer @ {} fired", otc.timestamp());

    }

  }

}

这篇关于将计时器设置为所见的最小时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆