Beam/Google Cloud Dataflow ReadFromPubsub缺少数据 [英] Beam/Google Cloud Dataflow ReadFromPubsub Missing Data

查看:73
本文介绍了Beam/Google Cloud Dataflow ReadFromPubsub缺少数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有2条数据流流传输管道(从pubsub到bigquery),其代码如下:

I have 2 dataflow streaming pipelines (pubsub to bigquery) with the following code :

class transform_class(beam.DoFn):

    def process(self, element, publish_time=beam.DoFn.TimestampParam, *args, **kwargs):
        logging.info(element)
        yield element

class identify_and_transform_tables(beam.DoFn):
    #Adding Publish Timestamp
    #Since I'm reading from a topic that consist data from multiple tables, 
    #function here is to identify the tables and split them apart


def run(pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        lines = (pipeline 
                | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic='topic name',with_attributes = True)
                | 'Transforming Messages' >> beam.ParDo(transform_class())
                | 'Identify Tables' >> beam.ParDo(identify_and_transform_tables()).with_outputs('table_name'))

        table_name = lines.table_name
        table_name = (table_name 
                        | 'Write table_name to BQ' >> beam.io.WriteToBigQuery(
                        table='table_name',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                        )

    result = pipeline.run()

这两个管道均从同一pubsub主题中读取. 进行协调时,我发现丢失了一些数据,并且两个管道的丢失数据都不相同. 例如,

Both of these pipelines read from the same pubsub topic. While reconciling, I found out that some data were missing and the missing data were different for both pipelines. For example,

管线1中缺少行56-62,但管线2中存在
管道2中缺少第90-95行,但管道1中存在

Row 56-62 was missing from pipeline 1 but present in pipeline 2
Row 90-95 was missing from pipeline 2 but present in pipeline 1

因此,这意味着数据存在于pubsub主题中.
如您在代码中看到的,第一个功能是将pubsub消息直接记录到stackdriver中. 我再次检查了大查询中额外的堆栈驱动程序日志中是否缺少数据.

Hence, it means that the data is present in the pubsub topic.
As you can see in the code, the first function is to log the pubsub message directly into stackdriver. I double checked for the missing data in the stackdriver logs additional to bigquery.

我发现的另一件事是,这些丢失的数据会在很长一段时间内发生. 例子, 第56-62行的时间戳为'2019-12-03 05:52:18.754150 UTC'并且接近时间戳(毫秒)

Another thing I've found out is that these missing data happen in chunks of time. Example, Row 56-62 has timestamps '2019-12-03 05:52:18.754150 UTC' and close to that (to the millisecond)

因此,我的唯一结论是,数据流readpubpubsub有时会丢失数据?
任何帮助深表感谢.

Hence, my only conclusion is that dataflow readfrompubsub has occasions where data just go missing ?
Any assistance is deeply appreaciated.

推荐答案

我不确定在这种情况下会发生什么,但这是防止数据丢失的重要规则:

I'm not sure what happened in this case, but this is an important rule to follow to prevent data losses:

  • 不阅读主题,如beam.io.ReadFromPubSub(topic='topic name').
  • 请像beam.io.ReadFromPubSub(subscription='subscription name')一样从订阅中读取内容.
  • Don't read from a topic, as in beam.io.ReadFromPubSub(topic='topic name').
  • Do read from a subscription, as in beam.io.ReadFromPubSub(subscription='subscription name').

这是因为在重新启动的情况下,在第一种情况下将创建一个新的订阅-并且此订阅可能仅包含在创建之后接收到的数据.如果您事先创建了订阅,则数据将一直保留在订阅中,直到被读取(或过期)为止.

This because in case of restart, a new subscription will be created in the first case - and this subscription might only contain data received after it was created. If you create the subscription beforehand, data will be retained in it until it's read (or it expires).

这篇关于Beam/Google Cloud Dataflow ReadFromPubsub缺少数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆