是否可以将Pub/Sub和BigQuery都作为Google Dataflow的输入? [英] Is it possible to have both Pub/Sub and BigQuery as inputs in Google Dataflow?

查看:31
本文介绍了是否可以将Pub/Sub和BigQuery都作为Google Dataflow的输入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的项目中,我希望使用Google Dataflow中的流传输管道来处理发布/订阅消息.在清理输入数据时,我希望从BigQuery中获得一个侧面输入.这带来了一个问题,将导致两个输入之一无法正常工作.

In my project, I am looking to use a streaming pipeline in Google Dataflow in order to process Pub/Sub messages. In cleaning the input data, I am looking to also have a side input from BigQuery. This has presented a problem that will cause one of the two inputs to not work.

我在管道选项中设置了streaming = True,它允许发布/订阅输入正确处理.但是BigQuery与流式传输管道不兼容(请参见下面的链接):

I have set in my Pipeline options for streaming=True, which allows the Pub/Sub inputs to process properly. But BigQuery is not compatible with streaming pipelines (see link below):

https://cloud.google.com/dataflow/docs/resources/faq#what_are_the_current_limitations_of_streaming_mode

我收到此错误:"ValueError:Cloud Pub/Sub当前仅可用于流送管道."基于限制,这是可以理解的.

I received this error: "ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines." This is understandable based on the limitations.

但是我只是想将BigQuery用作侧面输入,以便将数据映射到传入的Pub/Sub数据流.它在本地运行良好,但是一旦我尝试在Dataflow上运行它,它就会返回错误.

But I am only looking to use BigQuery as a side input in order to map data to the incoming Pub/Sub data stream. It works fine locally, but once I try to run it on Dataflow, it returns the error.

有人找到了一个很好的解决方法吗?

Has anyone found a good workaround for this?

在下面添加我的管道框架以供参考:

adding the framework of my pipeline below for reference:

# Set all options needed to properly run the pipeline
options = PipelineOptions(streaming=True,
                          runner='DataflowRunner', 
                          project=project_id)

p = beam.Pipeline(options = options)

n_tbl_src = (p
         | 'Nickname Table Read' >> beam.io.Read(beam.io.BigQuerySource(
            table = nickname_spec
        )))

# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
              | beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
                                     subscription = 'projects/{0}/subscriptions/{1}'
                                                  .format(project_id, subscription_name),
                                     with_attributes = True)
              | 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
              | 'Fix Value Types' >> beam.ParDo(FixTypesFn())
              | 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
              | 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=AsList(n_tbl_src))
              | 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))


# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
    table = bq_spec,
    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))

# Run the pipeline
p.run()

推荐答案

@Pablo在上面的评论是正确的答案.对于在相同情况下工作的任何人,以下是我的脚本中所做的更改.

@Pablo's comment above was the correct answer. For anyone working through the same situation, below is the change in my script that worked.

# This opens the Beam pipeline to run Dataflow
p = beam.Pipeline(options = options)
logging.info('Created Dataflow pipeline.')

# This will pull in all of the recorded nicknames to compare to the incoming PubSubMessages.

client = bigquery.Client()
query_job = client.query("""
    select * from `{0}.{1}.{2}`""".format(project_id, dataset_id, nickname_table_id))
nickname_tbl = query_job.result()
nickname_tbl = [dict(row.items()) for row in nickname_tbl]

# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
              | beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
                                     subscription = 'projects/{0}/subscriptions/{1}'
                                                  .format(project_id, subscription_name),
                                     with_attributes = True)
              | 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
              | 'Fix Value Types' >> beam.ParDo(FixTypesFn())
              | 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
              | 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=nickname_tbl)
              | 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))


# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
    table = bq_spec,
    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))

# Run the pipeline
p.run()

这篇关于是否可以将Pub/Sub和BigQuery都作为Google Dataflow的输入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆