为什么光束管道中的 GroupByKey 会复制元素(在 Google Dataflow 上运行时)? [英] Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?

查看:19
本文介绍了为什么光束管道中的 GroupByKey 会复制元素(在 Google Dataflow 上运行时)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个管道,它从接收来自 PubSub 的消息开始,每个消息都有一个文件名.这些文件被分解为行级,解析为 JSON 对象节点,然后发送到外部解码服务(对一些编码数据进行解码).对象节点最终会转换为 Table Rows 并写入 Big Query.

We have a pipeline that starts by receiving messages from PubSub, each with the name of a file. These files are exploded to line level, parsed to JSON object nodes and then sent to an external decoding service (which decodes some encoded data). Object nodes are eventually converted to Table Rows and written to Big Query.

似乎直到 PubSub 消息到达解码服务,Dataflow 才确认它们.解码服务很慢,导致一次发送许多消息时积压.这意味着与 PubSub 消息关联的行可能需要一些时间才能到达解码服务.结果,PubSub 没有收到确认并重新发送消息.我第一次尝试解决这个问题是为使用 withAttributeId() 传递给 Reader 的每个 PubSub 消息添加一个属性.但是,在测试中,这只能防止出现靠近的重复项.

It appeared that Dataflow was not acknowledging the PubSub messages until they arrived at the decoding service. The decoding service is slow, resulting in a backlog when many message are sent at once. This means that lines associated with a PubSub message can take some time to arrive at the decoding service. As a result, PubSub was receiving no acknowledgement and resending the message. My first attempt to remedy this was adding an attribute to each PubSub messages that is passed to the Reader using withAttributeId(). However, on testing, this only prevented duplicates that arrived close together.

我的第二次尝试是添加一个融合断路器(示例)在 PubSub 之后读.这只是执行一个不必要的 GroupByKey,然后取消分组,其想法是 GroupByKey 强制 Dataflow 确认 PubSub 消息.

My second attempt was to add a fusion breaker (example) after the PubSub read. This simply performs a needless GroupByKey and then ungroups, the idea being that the GroupByKey forces Dataflow to acknowledge the PubSub message.

上面讨论的融合断路器的工作原理是它阻止 PubSub 重新发送消息,但我发现这个 GroupByKey 输出的元素比它接收的多:见图片.

The fusion breaker discussed above works in that it prevents PubSub from resending messages, but I am finding that this GroupByKey outputs more elements than it receives: See image.

为了尝试诊断这个问题,我删除了部分管道以获得一个仍然表现出这种行为的简单管道.即使在

To try and diagnose this I have removed parts of the pipeline to get a simple pipeline that still exhibits this behavior. The behavior remains even when

  • PubSub 被一些虚拟转换所取代,这些虚拟转换发送固定的消息列表,每个消息之间有轻微的延迟.
  • 删除了写入转换.
  • 移除所有侧输入/输出.

我观察到的行为是:

  1. 一些接收到的消息直接通过 GroupByKey.
  2. 在某个时间点之后,消息被 GroupByKey保留"(大概是由于 GroupByKey 之后的积压).
  3. 这些消息最终会退出 GroupByKey(以大小为一的组).
  4. 经过短暂的延迟(大约 3 分钟)后,相同的消息再次退出 GroupByKey(仍然是大小为 1 的组).这可能会发生多次(我怀疑这与他们等待进入 GroupByKey 所花费的时间成正比).

示例作业 ID 为 2017-10-11_03_50_42-6097948956276262224.我没有在任何其他跑步者身上跑过横梁.

Example job id is 2017-10-11_03_50_42-6097948956276262224. I have not run the beam on any other runner.

融合破坏者如下:

@Slf4j
public class FusionBreaker<T> extends PTransform<PCollection<T>, PCollection<T>> {

  @Override
  public PCollection<T> expand(PCollection<T> input) {
    return group(window(input.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break in")))))
            .apply("Getting iterables after breaking fusion", Values.create())
            .apply("Flattening iterables after breaking fusion", Flatten.iterables())
            .apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break out")));
  }

  private PCollection<T> window(PCollection<T> input) {
    return input.apply("Windowing before breaking fusion", Window.<T>into(new GlobalWindows())
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
            .discardingFiredPanes());
  }

  private PCollection<KV<Integer, Iterable<T>>> group(PCollection<T> input) {
    return input.apply("Keying with random number", ParDo.of(new RandomKeyFn<>()))
            .apply("Grouping by key to break fusion", GroupByKey.create());
  }

  private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    private Random random;

    @Setup
    public void setup() {
      random = new Random();
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(KV.of(random.nextInt(), context.element()));
    }
  }

}

PassthroughLoggers 只是记录通过的元素(我用这些来确认元素确实重复了,而不是计数有问题).

The PassthroughLoggers simply log the elements passing through (I use these to confirm that elements are indeed repeated, rather than there being an issue with the counts).

我怀疑这与窗口/触发器有关,但我的理解是在使用 .discardingFiredPanes() 时永远不应重复元素 - 无论窗口设置如何.我也尝试过 FixedWindows,但没有成功.

I suspect this is something to do with windows/triggers, but my understanding is that elements should never be repeated when .discardingFiredPanes() is used - regardless of the windowing setup. I have also tried FixedWindows with no success.

推荐答案

首先,Reshuffle 转换相当于您的 Fusion Breaker,但有一些额外的性能改进,应该会使其更受欢迎.

First, the Reshuffle transform is equivalent to your Fusion Breaker, but has some additional performance improvements that should make it preferable.

其次,如果重试,计数器和日志都可能多次看到一个元素.如Beam Execution Model 中所述,步骤中的元素可能是如果重试融合到其中的任何内容,则重试.

Second, both counters and logging may see an element multiple times if it is retried. As described in the Beam Execution Model, an element at a step may be retried if anything that is fused into it is retried.

您是否真的观察到在管道输出中写入的内容中有重复项?

Have you actually observed duplicates in what is written as the output of the pipeline?

这篇关于为什么光束管道中的 GroupByKey 会复制元素(在 Google Dataflow 上运行时)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆