为什么我的融合破坏者丢失或保留数据? [英] Why is my fusion breaker losing or holding back data?

查看:88
本文介绍了为什么我的融合破坏者丢失或保留数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究流式Dataflow管道,该管道使用来自PubSub的批处理项目的消息,并最终将其写入Datastore.为了获得更好的并行性,并且为了及时确认从PubSub中提取的消息,我将批次拆包成单个项目,并在其后添加融合断路器.

I am working on a streaming Dataflow pipeline that consumes messages of batched items from PubSub and eventually writes them to Datastore. For better parallelism, and also for timely acknowledgement of the messages pulled from the PubSub, I unpack the batches into individual items and add a fusion breaker right after it.

所以管道看起来像这样...

So the pipeline looks like this ...

PubSubIO->反序列化->解包->融合中断->验证/转换-> DatastoreIO.

PubSubIO -> deserialize -> unpack -> fusion break -> validation/transform -> DatastoreIO.

这是我的融合破坏者,大部分复制自

Here is my fusion breaker, largely copied from the JdbcIO class. It uses a trigger to break down the data that are in the global window.

public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

  @Override
  public PCollection<T> expand(PCollection<T> input) {
    return input
        .apply(ParDo.of(new RandomKeyFn<T>()))
        .apply(Window.<KV<Integer, T>>triggering(
            Repeatedly.forever(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(2L))))
            .discardingFiredPanes())
        .apply(GroupByKey.<Integer, T>create())
        .apply(Values.<Iterable<T>>create())
        .apply(Flatten.<T>iterables());
  }

  private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    private Random random;

    @Setup
    public void setup() {
      random = new Random();
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(KV.of(random.nextInt(), context.element()));
    }
  }
}

它在大多数情况下都有效,除了在某些情况下,即使输出完成流传输并且管道空闲了十分钟之后,它生成的输出数量也少于输入的数量.

It works most of the time, except on several occasions when it generates less number of outputs than the number of inputs, even after the streaming input is done and the pipeline goes idle for ten minutes.

如下面的数据流作业"监视控制台中所示.屏幕快照是在工作耗尽之后拍摄的,我等了大约10分钟,以便数据从转换中取出.

As seen in the Dataflow Job monitoring console below. The screen shot was taken after the job was drained, after I waited for around 10 minutes for the data to come out of the transform.

*有人能想到一个解释吗?感觉好像是融合断路器正在退缩或丢失了一些物品. *

*Can someone think of an explanation for that? It feels as if the fusion breaker is holding back or has lost some items. *

我注意到,只有在数据量/数据率很高时才会发生这种情况,这迫使管道在测试运行的中间进行扩展,从25名n1-highmem-2工作人员增加了一倍.但是,我还没有做足够的测试来验证放大是否是重现此问题的关键.

I noticed that it only happens when the data volume / data rate is high, forcing the pipeline to scale up in the middle of test run, doubling from 25 to 50 n1-highmem-2 workers. However, I haven't done enough tests to verify if the scaling up is the key to reproduce this problem.

还是触发器每两秒钟触发一次太频繁?

Or maybe the trigger fires off too frequently at once every two seconds?

我正在使用Dataflow 2.0.0-beta1.职位ID为"2017-02-23_23_15_34-14025424484787508627".

I am using Dataflow 2.0.0-beta1. The Job Id is "2017-02-23_23_15_34-14025424484787508627".

推荐答案

Streaming Dataflow中的计数器是尽力而为的措施.特别是自动缩放可能会导致更大的差异.在这种情况下,管道不应丢失数据.

Counters in Streaming Dataflow are best-effort measures; autoscaling in particular may cause larger discrepancies. The pipeline should not lose data in this case.

这篇关于为什么我的融合破坏者丢失或保留数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆