使用Apache Beam按键处理事件的总排序 [英] Processing Total Ordering of Events By Key using Apache Beam

查看:70
本文介绍了使用Apache Beam按键处理事件的总排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从实时流中为每个键生成事件项的总(线性)顺序,其中顺序是事件时间(来自事件有效负载).

I am trying to generate a total (linear) order of event items per key from a real-time stream where the order is event time (derived from the event payload).

我曾尝试使用流技术来实现此目标,如下所示:

I had attempted to implement this using streaming as follows:

1)设置非重叠的顺序窗口,例如持续5分钟

1) Set up a non overlapping sequential windows, e.g. duration 5 minutes

2)确定允许的延迟-丢弃延迟事件是可以的

2) Establish an allowed lateness - it is fine to discard late events

3)设置累积模式以保留所有触发的窗格

3) Set accumulation mode to retain all fired panes

4)使用"AfterwaterMark"触发器

4) Use the "AfterwaterMark" trigger

5)处理触发的窗格时,请仅考虑窗格是否为最终窗格

5) When handling a triggered pane, only consider the pane if it is the final one

6)使用GroupBy.perKey来确保此窗口中此键的所有事件将在单个资源上作为一个单元进行处理

6) Use GroupBy.perKey to ensure all events in this window for this key will be processed as a unit on a single resource

虽然这种方法可以确保给定窗口内每个键的线性顺序,但不能保证跨多个窗口(例如在与先前的窗口同时处理密钥之后,可能会出现一个事件事件窗口,如果第一个窗口失败并必须重试,则很容易发生这种情况.

While this approach ensures linear order for each key within a given window, it does not make that guarantee across multiple windows, e.g. there could be a window of events for the key which occurs after that is being processed at the same time as the earlier window, this could easily happen if the first window failed and had to be retried.

我正在考虑采用这种方法,在这种方法中,可以首先处理实时流,以便按键对事件进行分区,然后将事件写入窗口范围命名的文件中. 由于波束处理的并行性质,这些文件也将无序生成. 然后,一个流程协调员可以将这些文件顺序地提交到批处理管道-仅在接收到前一个文件并且其下游处理已成功完成时才提交下一个文件.

I'm considering adapting this approach where the realtime stream can first be processed so that it partitions the events by key and writes them to files named by their window range. Due to the parallel nature of beam processing, these files will also be generated out of order. A single process coordinator could then submit these files sequentially to a batch pipeline - only submitting the next one when it has received the previous file and that downstream processing of it has completed successfully.

问题是,只有在该时间窗口中至少有一个时间元素时,Apache Beam才会触发一个窗格.因此,如果事件中存在间隙,则所生成的文件中可能存在间隙,即丢失文件.缺少文件的问题在于,协调批处理程序无法区分是否知道时间窗已经过去而没有数据,或者是否有故障,在这种情况下,直到文件最终到达,它才能继续进行.

The problem is that Apache Beam will only fire a pane if there was at least one time element in that time window. Thus if there are gaps in events then there could be gaps in the files that are generated - i.e. missing files. The problem with having missing files is that the coordinating batch processor cannot make the distinction between knowing whether the time window has passed with no data or if there has been a failure in which case it cannot proceed until the file finally arrives.

强制事件窗口触发的一种方法可能是以某种方式将虚拟事件添加到每个分区和时间窗口的流中.但是,这样做很棘手...如果在时间序列上有很大的间隔,那么如果这些虚拟事件发生的时间被事件包围得很晚,那么它们将被视为较晚而被丢弃.

One way to force the event windows to trigger might be to somehow add dummy events to the stream for each partition and time window. However, this is tricky to do...if there are large gaps in the time sequence then if these dummy events occur surrounded by events much later then they will be discarded as being late.

是否存在其他方法来确保每个可能的事件窗口都有触发器,即使这会导致输出空文件呢?

Are there other approaches to ensuring there is a trigger for every possible event window, even if that results in outputting empty files?

Apache Beam是否可以通过键从实时流中生成总排序,这是一个棘手的问题?我应该考虑另一种方法吗?

Is generating a total ordering by key from a realtime stream a tractable problem with Apache Beam? Is there another approach I should be considering?

推荐答案

根据您对tractable的定义,当然可以通过Apache Beam中的事件时间戳按键对每个流进行完全排序.

Depending on your definition of tractable, it is certainly possible to totally order a stream per key by event timestamp in Apache Beam.

以下是设计背后的注意事项:

Here are the considerations behind the design:

  1. Apache Beam不能保证按顺序传输,因此在管道中没有用处.因此,我假设您正在执行此操作,这样您就可以编写仅具有处理顺序的功能的外部系统.
  2. 如果事件的时间戳记为 t ,除非您等到 t 可以放下,否则您永远无法确定没有更早的事件会到达.
  1. Apache Beam does not guarantee in-order transport, so there is no use within a pipeline. So I will assume you are doing this so you can write to an external system with only the capability to handle things if they come in order.
  2. If an event has timestamp t, you can never be certain no earlier event will arrive unless you wait until t is droppable.

这就是我们的处理方式:

So here's how we'll do it:

  1. 我们将编写一个ParDo,该ParDo使用状态和计时器(博客帖子仍在审核中).这使它成为每个按键的工作流程.
  2. 当元素到达时,我们将缓冲它们的状态.因此,允许的延迟会影响所需数据结构的效率.您需要的是一个堆,可以窥视并弹出最小的时间戳和元素.没有内置的堆状态,所以我将其写为ValueState.
  3. 我们将设置一个事件时间计时器,以在元素的时间戳不再矛盾时接收回叫.
  1. We'll write a ParDo that uses state and timers (blog post still under review) in the global window. This makes it a per-key workflow.
  2. We'll buffer elements in state when they arrive. So your allowed lateness affects how efficient of a data structure you need. What you need is a heap to peek and pop the minimum timestamp and element; there's no built-in heap state so I'll just write it as a ValueState.
  3. We'll set a event time timer to receive a call back when an element's timestamp can no longer be contradicted.

为了简洁起见,我将假设使用自定义的EventHeap数据结构.实际上,您希望将其分解为多个状态单元,以最大程度地减少传输的数据.堆可能是对原始状态类型的合理补充.

I'm going to assume a custom EventHeap data structure for brevity. In practice, you'd want to break this up into multiple state cells to minimize the data transfered. A heap might be a reasonable addition to primitive types of state.

我还将假定我们所需的所有编码器均已注册,并专注于状态和计时器逻辑.

I will also assume that all the coders we need are already registered and focus on the state and timers logic.

new DoFn<KV<K, Event>, Void>() {

  @StateId("heap")
  private final StateSpec<ValueState<EventHeap>> heapSpec = StateSpecs.value();

  @TimerId("next")
  private final TimerSpec nextTimerSpec = TimerSpec.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = firstNonNull(
      heapState.read(),
      EventHeap.createForKey(ctx.element().getKey()));
    heap.add(ctx.element().getValue());
    // When the watermark reaches this time, no more elements
    // can show up that have earlier timestamps
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }

  @OnTimer("next")
  public void onNextTimestamp(
      OnTimerContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = heapState.read();
    // If the timer at time t was delivered the watermark must
    // be strictly greater than t
    while (!heap.nextTimestamp().isAfter(ctx.timestamp())) {
      writeToExternalSystem(heap.pop());
    }
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }
}

希望这可以使您开始着手进行基础用例的开发.

This should hopefully get you started on the way towards whatever your underlying use case is.

这篇关于使用Apache Beam按键处理事件的总排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆