为什么我的融合断路器丢失或阻止数据? [英] Why is my fusion breaker losing or holding back data?

查看:22
本文介绍了为什么我的融合断路器丢失或阻止数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一个流式数据流管道,该管道使用来自 PubSub 的批处理项目的消息,并最终将它们写入数据存储区.为了获得更好的并行性,并及时确认从 PubSub 中提取的消息,我将批次解包为单独的项目,并在其后立即添加一个融合断路器.

所以管道看起来像这样......

PubSubIO -> 反序列化 -> 解包 -> 融合中断 -> 验证/转换 -> DatastoreIO.

这是我的融合断路器,主要从

*有人能想出一个解释吗?感觉好像融合破坏者在阻止或丢失了一些物品.*

我注意到只有在数据量/数据速率很高时才会发生这种情况,迫使管道在测试运行过程中扩大规模,从 25 到 50 n1-highmem-2 工人翻倍.但是,我还没有做足够的测试来验证放大是否是重现此问题的关键.

或者触发器每两秒触发一次太频繁?

我使用的是 Dataflow 2.0.0-beta1.作业 ID 为2017-02-23_23_15_34-14025424484787508627".

解决方案

Streaming Dataflow 中的计数器是尽力而为的措施;特别是自动缩放可能会导致更大的差异.在这种情况下,管道不应丢失数据.

I am working on a streaming Dataflow pipeline that consumes messages of batched items from PubSub and eventually writes them to Datastore. For better parallelism, and also for timely acknowledgement of the messages pulled from the PubSub, I unpack the batches into individual items and add a fusion breaker right after it.

So the pipeline looks like this ...

PubSubIO -> deserialize -> unpack -> fusion break -> validation/transform -> DatastoreIO.

Here is my fusion breaker, largely copied from the JdbcIO class. It uses a trigger to break down the data that are in the global window.

public class BreakFusionTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {

  @Override
  public PCollection<T> expand(PCollection<T> input) {
    return input
        .apply(ParDo.of(new RandomKeyFn<T>()))
        .apply(Window.<KV<Integer, T>>triggering(
            Repeatedly.forever(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(2L))))
            .discardingFiredPanes())
        .apply(GroupByKey.<Integer, T>create())
        .apply(Values.<Iterable<T>>create())
        .apply(Flatten.<T>iterables());
  }

  private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    private Random random;

    @Setup
    public void setup() {
      random = new Random();
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      context.output(KV.of(random.nextInt(), context.element()));
    }
  }
}

It works most of the time, except on several occasions when it generates less number of outputs than the number of inputs, even after the streaming input is done and the pipeline goes idle for ten minutes.

As seen in the Dataflow Job monitoring console below. The screen shot was taken after the job was drained, after I waited for around 10 minutes for the data to come out of the transform.

*Can someone think of an explanation for that? It feels as if the fusion breaker is holding back or has lost some items. *

I noticed that it only happens when the data volume / data rate is high, forcing the pipeline to scale up in the middle of test run, doubling from 25 to 50 n1-highmem-2 workers. However, I haven't done enough tests to verify if the scaling up is the key to reproduce this problem.

Or maybe the trigger fires off too frequently at once every two seconds?

I am using Dataflow 2.0.0-beta1. The Job Id is "2017-02-23_23_15_34-14025424484787508627".

解决方案

Counters in Streaming Dataflow are best-effort measures; autoscaling in particular may cause larger discrepancies. The pipeline should not lose data in this case.

这篇关于为什么我的融合断路器丢失或阻止数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆