Google Dataflow:在Python中使用BigQuery + Pub/Sub运行动态查询 [英] Google Dataflow: running dynamic query with BigQuery+Pub/Sub in Python

查看:87
本文介绍了Google Dataflow:在Python中使用BigQuery + Pub/Sub运行动态查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在管道中做什么:

  1. 从pub/sub中读取(完成)
  2. 将此数据转换为字典(完成)
  3. 从字典中获取指定键的值(完成)
  4. 从BigQuery运行参数化/动态查询,其中where部分应如下所示:

  1. Read from pub/sub (done)
  2. Transform this data to dictionary (done)
  3. Take the value of a specified key from the dict (done)
  4. Run a parametrized/dynamic query from BigQuery in which the where part should be like this:

SELECT field1 FROM Table where field2 = @valueFromP/S


管道

| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription='')
| 'String to dictionary' >> beam.Map(lambda s:data_ingestion.parse_method(s))
| 'BigQuery' >> <Here is where I'm not sure how to do it>

从BQ读取数据的正常方法是:

The normal way to read from BQ it would be like:

| 'Read' >> beam.io.Read(beam.io.BigQuerySource(
                query="SELECT field1 FROM table where field2='string'", use_standard_sql=True))


我已经阅读了有关参数化的查询但我不确定这是否适用于Apache Beam.


I have read about parameterized queries but i'm not sure if this would work with apache beam.

可以使用侧面输入来完成吗?

It could be done using side inputs?

哪种方法是最好的方法?

Which would be the best way to do this?

我尝试过的事情:

def parse_methodBQ(input):
    query=''SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])'
    return query


class ReadFromBigQuery(beam.PTransform):
    def expand(self, pcoll):
        return (
                pcoll
                | 'FormatQuery' >> beam.Map(parse_methodBQ)
                | 'Read' >> beam.Map(lambda s:  beam.io.Read(beam.io.BigQuerySource(query=s)))
        )

with beam.Pipeline(options=pipeline_options) as p:
transform = (p  | 'BQ' >> ReadFromBigQuery()

结果(为什么?):

<Read(PTransform) label=[Read]>

正确的结果应该是:

{u'Field1': u'string', u'Field2': Bool}


解决方案

在管道中:

| 'BQ' >> beam.Map(parse_method_BQ))

函数(使用BigQuery 0.25 API进行数据流)

The function (using the BigQuery 0.25 API for dataflow)

def parse_method_BQ(input):
    client = bigquery.Client()
    QUERY = 'SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY ,job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
        query_job.reload()  # Refreshes the state via a GET request.
        if query_job.state == 'DONE':
            if query_job.error_result:
                raise RuntimeError(query_job.errors)
            rows = query_job.results().fetch_data()
            for row in rows:
                if not (row[0] is None):  
                    return input
        time.sleep(1)

推荐答案

您可以我了解您将根据需要使用parse_methodBQ方法来自定义查询.当此方法返回查询时,您可以使用BigQuerySource进行调用.这些行在字典中.

I understand that you will use the parse_methodBQ method to customize the query as needed. As this method returns a query, you can call it with BigQuerySource. The rows are in dictionary.

| 'QueryTable' >> beam.Map(beam.io.BigQuerySource(parse_methodBQ))
# Each row is a dictionary where the keys are the BigQuery columns
| 'Read' >> beam.Map(lambda s:  s['data'])

此外,您可以避免自定义查询并使用

Further more, you can avoid having to customize the query and use a filter method

关于侧面输入,请查看 这个示例,以更好地了解如何使用它们.

Regarding the side inputs, review this example from the cookbook to have a better view on how to use them.

这篇关于Google Dataflow:在Python中使用BigQuery + Pub/Sub运行动态查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆