Apache Flink - 如果 x 分钟没有收到数据,则发送事件 [英] Apache Flink - Send event if no data was received for x minutes

查看:47
本文介绍了Apache Flink - 如果 x 分钟没有收到数据,则发送事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用 Flink 的 DataStream API 实现一个操作符,该操作符在一段时间内没有从流中接收到数据时发送事件?

How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?

推荐答案

这样的操作符可以使用 ProcessFunction 来实现.

Such an operator can be implemented using a ProcessFunction.

DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);

input
  // use keyBy to have keyed state. 
  // NullByteKeySelector will move all data to one task. You can also use other keys
  .keyBy(new NullByteKeySelector())
  // use process function with 60 seconds timeout
  .process(new TimeOutFunction(60 * 1000));

TimeOutFunction 定义如下.在这个例子中,它使用了处理时间.

The TimeOutFunction is defined as follows. In this example it uses processing time.

public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {

  // delay after which an alert flag is thrown
  private final long timeOut;
  // state to remember the last timer set
  private transient ValueState<Long> lastTimer;

  public TimeOutFunction(long timeOut) {
    this.timeOut = timeOut;
  }

  @Override
  public void open(Configuration conf) {
    // setup timer state
    ValueStateDescriptor<Long> lastTimerDesc = 
      new ValueStateDescriptor<Long>("lastTimer", Long.class);
    lastTimer = getRuntimeContext().getState(lastTimerDesc);
  }

  @Override
  public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
    // get current time and compute timeout time
    long currentTime = ctx.timerService().currentProcessingTime();
    long timeoutTime = currentTime + timeOut;
    // register timer for timeout time
    ctx.timerService().registerProcessingTimeTimer(timeoutTime);
    // remember timeout time
    lastTimer.update(timeoutTime);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value()) {
      // it was, so no data was received afterwards.
      // fire an alert.
      out.collect(true);
    }
  }
}

这篇关于Apache Flink - 如果 x 分钟没有收到数据,则发送事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆