如何将重复数据删除添加到流传输管道[apache-beam] [英] How to add de-duplication to a streaming pipeline [apache-beam]

查看:100
本文介绍了如何将重复数据删除添加到流传输管道[apache-beam]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在apache beam [python]中有一个正在工作的流传输管道,该管道从pub/sub提取数据,对数据流进行扩充,然后将其传递给big-query.

I have a working streaming pipeline in apache beam [python] that ingests data from pub/sub, performs enrichment in dataflow and passes it to big-query.

对于流媒体窗口,我想确保消息不会重复(因为pub/sub至少保证一次传递).

Withing the streaming window, I would like to ensure that messages are not getting duplicated, (as pub/sub guarantees only at least once delivery).

因此,我认为我只会使用与Beam不同的方法,但是一旦使用它,我的管道就会中断(无法继续进行,任何本地打印内容也不可见).

So, I figured I'd just use the distinct method from beam, but as soon as I use it my pipeline breaks (can't proceed any further, any local prints are also not visible).

这是我的管道代码:

    with beam.Pipeline(options=options) as p:
        message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
                   with_output_types(bytes))

        bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
                           | "Deduplication" >> beam.Distinct()
                           | "JSONLoad" >> beam.ParDo(ReadAsJSON())
                           | "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
                           | "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
                           | "PreProcessing" >> beam.ParDo(PreProcessing())
                           | "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
                   )

        if not known_args.local:
            bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
        else:
            bq_data | "Display" >> beam.ParDo(Display())

您会在重复数据删除标签中看到,我正在调用beam.Distinct方法.

As you can see in the de-duplication label, I'm calling the beam.Distinct method.

问题:

  1. 重复数据删除应该在管道中的什么地方进行?

  1. Where should de-duplication happen in the pipeline?

这甚至是正确/理智的方法吗?

Is this even a correct/sane approach?

我还能如何删除流缓冲区数据的重复项?

How else can I de-duplicate streaming buffer data?

是否甚至需要重复数据删除,还是我只是在浪费时间?

Is de-duplication even required, or am I just wasting my time?

任何解决方案或建议将不胜感激.谢谢.

Any solutions or suggestions would be highly appreciated. Thanks.

推荐答案

您可以在一次精确处理很有帮助.首先,Dataflow已经基于发布/订阅记录ID执行重复数据删除.但是,如博客所述:但是,在某些情况下,这还不够.用户的发布过程可能会重试发布."

You may find this blog on Exactly-once processing helpful. First, Dataflow is already performing deduplication based on the pub/sub record id. However, as the blog states: "In some cases however, this is not quite enough. The user’s publishing process might retry publishes".

因此,如果将消息发布到Pub/Sub的系统可能多次发布同一条消息,则您可能希望添加自己的确定性记录ID.然后,Cloud Dataflow将检测到这些.这是我建议的方法,而不是尝试在自己的管道中进行重复数据删除.

So, if your system which is publishing messages to Pub/Sub may be publishing the same message multiple times, then you may wish to add your own deterministic record id. Then Cloud Dataflow will detect these. This is the approach I would recommend, instead of trying to deduplicate in your own pipeline.

您可以使用一些关于为什么我认为相异"会导致卡住的解释.不同尝试在窗口中删除重复数据.我相信您正在尝试对全局窗口进行重复数据删除,因此您的管道必须缓冲并比较所有元素,并且由于这是无界的PCollection.它将尝试永久缓冲.

Some explanation as to why I believe Distinct causes stuckness. Distinct tries to deduplicate data within a Window. I believe you are trying to deduplciate the global window, so your pipeline must buffer and compare all elements, and since this is an unbounded PCollection. It will try to buffer forever.

我相信,如果您先执行窗口化,并且具有确定性的事件时间戳,则此方法将正常工作(看起来您使用的是

I believe this will work properly if you perform windowing first, and you have deterministic event timestamps (It doesn't look like you are using withTimestampAttribute). Then Distinct will apply to just the elements within the window (and identical elements with identical timestamps will be put in the same window). You may want to see if this works for prototyping, but I recommend adding unique record ids if possible, and allowing Dataflow to handle duplication based on record id for best performance.

这篇关于如何将重复数据删除添加到流传输管道[apache-beam]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆