Beam/Google Cloud Dataflow ReadFromPubsub 丢失数据 [英] Beam/Google Cloud Dataflow ReadFromPubsub Missing Data

查看:34
本文介绍了Beam/Google Cloud Dataflow ReadFromPubsub 丢失数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 2 个数据流流管道(pubsub 到 bigquery),代码如下:

I have 2 dataflow streaming pipelines (pubsub to bigquery) with the following code :

class transform_class(beam.DoFn):

    def process(self, element, publish_time=beam.DoFn.TimestampParam, *args, **kwargs):
        logging.info(element)
        yield element

class identify_and_transform_tables(beam.DoFn):
    #Adding Publish Timestamp
    #Since I'm reading from a topic that consist data from multiple tables, 
    #function here is to identify the tables and split them apart


def run(pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        lines = (pipeline 
                | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic='topic name',with_attributes = True)
                | 'Transforming Messages' >> beam.ParDo(transform_class())
                | 'Identify Tables' >> beam.ParDo(identify_and_transform_tables()).with_outputs('table_name'))

        table_name = lines.table_name
        table_name = (table_name 
                        | 'Write table_name to BQ' >> beam.io.WriteToBigQuery(
                        table='table_name',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                        )

    result = pipeline.run()

这两个管道都从同一个发布订阅主题中读取.在协调时,我发现一些数据丢失,并且两个管道的丢失数据不同.例如,

Both of these pipelines read from the same pubsub topic. While reconciling, I found out that some data were missing and the missing data were different for both pipelines. For example,

第 56-62 行在管道 1 中缺失,但存在于管道 2
管道 2 中缺少第 90-95 行,但存在于管道 1 中

Row 56-62 was missing from pipeline 1 but present in pipeline 2
Row 90-95 was missing from pipeline 2 but present in pipeline 1

因此,这意味着数据存在于 pubsub 主题中.
正如您在代码中看到的,第一个功能是将 pubsub 消息直接记录到 stackdriver 中.我仔细检查了除了 bigquery 之外的 stackdriver 日志中的缺失数据.

Hence, it means that the data is present in the pubsub topic.
As you can see in the code, the first function is to log the pubsub message directly into stackdriver. I double checked for the missing data in the stackdriver logs additional to bigquery.

我发现的另一件事是这些丢失的数据会在一段时间内发生.例子,第 56-62 行的时间戳为 '2019-12-03 05:52:18.754150 UTC' 并且接近该时间戳(以毫秒为单位)

Another thing I've found out is that these missing data happen in chunks of time. Example, Row 56-62 has timestamps '2019-12-03 05:52:18.754150 UTC' and close to that (to the millisecond)

因此,我唯一的结论是数据流 readfrompubsub 有时会丢失数据?
任何帮助深表感谢.

Hence, my only conclusion is that dataflow readfrompubsub has occasions where data just go missing ?
Any assistance is deeply appreaciated.

推荐答案

我不确定在这种情况下发生了什么,但这是防止数据丢失的重要规则:

I'm not sure what happened in this case, but this is an important rule to follow to prevent data losses:

  • 不要从主题中读取内容,例如 beam.io.ReadFromPubSub(topic='topic name').
  • 从订阅中读取,如 beam.io.ReadFromPubSub(subscription='subscription name').

这是因为在重新启动的情况下,在第一种情况下将创建一个新订阅 - 此订阅可能只包含创建后收到的数据.如果您事先创建订阅,数据将保留在其中,直到它被读取(或过期).

This because in case of restart, a new subscription will be created in the first case - and this subscription might only contain data received after it was created. If you create the subscription beforehand, data will be retained in it until it's read (or it expires).

这篇关于Beam/Google Cloud Dataflow ReadFromPubsub 丢失数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆