如何将重复数据删除添加到流管道 [apache-beam] [英] How to add de-duplication to a streaming pipeline [apache-beam]

查看:25
本文介绍了如何将重复数据删除添加到流管道 [apache-beam]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 apache beam [python] 中有一个工作流管道,它从发布/订阅中摄取数据,在数据流中执行丰富并将其传递给大查询.

I have a working streaming pipeline in apache beam [python] that ingests data from pub/sub, performs enrichment in dataflow and passes it to big-query.

通过流式传输窗口,我想确保消息不会重复(因为发布/订阅保证至少只传递一次).

Withing the streaming window, I would like to ensure that messages are not getting duplicated, (as pub/sub guarantees only at least once delivery).

所以,我想我只需要使用与 Beam 不同的方法,但是一旦我使用它,我的管道就会中断(无法继续进行,任何本地打印也不可见).

So, I figured I'd just use the distinct method from beam, but as soon as I use it my pipeline breaks (can't proceed any further, any local prints are also not visible).

这是我的管道代码:

    with beam.Pipeline(options=options) as p:
        message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
                   with_output_types(bytes))

        bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
                           | "Deduplication" >> beam.Distinct()
                           | "JSONLoad" >> beam.ParDo(ReadAsJSON())
                           | "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
                           | "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
                           | "PreProcessing" >> beam.ParDo(PreProcessing())
                           | "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
                   )

        if not known_args.local:
            bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
        else:
            bq_data | "Display" >> beam.ParDo(Display())

正如您在重复数据删除标签中看到的,我正在调用 beam.Distinct 方法.

As you can see in the de-duplication label, I'm calling the beam.Distinct method.

问题:

  1. 重复数据删除应该在管道中的哪些位置进行?

  1. Where should de-duplication happen in the pipeline?

这是否是一种正确/理智的方法?

Is this even a correct/sane approach?

我还能如何对流缓冲区数据进行重复数据删除?

How else can I de-duplicate streaming buffer data?

是否需要重复数据删除,或者我只是在浪费时间?

Is de-duplication even required, or am I just wasting my time?

任何解决方案或建议将不胜感激.谢谢.

Any solutions or suggestions would be highly appreciated. Thanks.

推荐答案

您可以在 Exactly-once processing 很有帮助.首先,Dataflow 已经根据发布/订阅记录 ID 执行重复数据删除.然而,正如博客所说:然而,在某些情况下,这还不够.用户的发布过程可能会重试发布".

You may find this blog on Exactly-once processing helpful. First, Dataflow is already performing deduplication based on the pub/sub record id. However, as the blog states: "In some cases however, this is not quite enough. The user’s publishing process might retry publishes".

因此,如果您向 Pub/Sub 发布消息的系统可能多次发布相同的消息,那么您可能希望添加自己的确定性记录 ID.然后 Cloud Dataflow 会检测这些.这是我推荐的方法,而不是尝试在您自己的管道中进行重复数据删除.

So, if your system which is publishing messages to Pub/Sub may be publishing the same message multiple times, then you may wish to add your own deterministic record id. Then Cloud Dataflow will detect these. This is the approach I would recommend, instead of trying to deduplicate in your own pipeline.

您可以使用 withIdAttribute 在 PubSubIO.Read 上.示例.

You can do this by using the withIdAttribute on PubSubIO.Read. Example.

关于为什么我认为 Distinct 会导致卡顿的一些解释.Distinct 尝试对窗口内的数据进行重复数据删除.我相信您正在尝试对全局窗口进行重复数据删除,因此您的管道必须缓冲和比较所有元素,并且因为这是一个无界的 PCollection.它将尝试永远缓冲.

Some explanation as to why I believe Distinct causes stuckness. Distinct tries to deduplicate data within a Window. I believe you are trying to deduplciate the global window, so your pipeline must buffer and compare all elements, and since this is an unbounded PCollection. It will try to buffer forever.

我相信如果您首先执行窗口化,并且您具有确定性的事件时间戳,这将正常工作(看起来您没有使用 withTimestampAttribute).然后 Distinct 将仅应用于窗口内的元素(并且具有相同时间戳的相同元素将放在同一窗口中).您可能想看看这是否适用于原型设计,但我建议尽可能添加唯一的记录 ID,并允许 Dataflow 基于记录 ID 处理重复以获得最佳性能.

I believe this will work properly if you perform windowing first, and you have deterministic event timestamps (It doesn't look like you are using withTimestampAttribute). Then Distinct will apply to just the elements within the window (and identical elements with identical timestamps will be put in the same window). You may want to see if this works for prototyping, but I recommend adding unique record ids if possible, and allowing Dataflow to handle duplication based on record id for best performance.

这篇关于如何将重复数据删除添加到流管道 [apache-beam]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆