Google Dataflow:在 Python 中使用 BigQuery+Pub/Sub 运行动态查询 [英] Google Dataflow: running dynamic query with BigQuery+Pub/Sub in Python

查看:20
本文介绍了Google Dataflow:在 Python 中使用 BigQuery+Pub/Sub 运行动态查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在筹备中做什么:

  1. 从发布/订阅中读取(完成)
  2. 将此数据转换为字典(完成)
  3. 从字典中获取指定键的值(完成)
  4. 从 BigQuery 运行参数化/动态查询,其中 where 部分应如下所示:

  1. Read from pub/sub (done)
  2. Transform this data to dictionary (done)
  3. Take the value of a specified key from the dict (done)
  4. Run a parametrized/dynamic query from BigQuery in which the where part should be like this:

SELECT field1 FROM Table where field2 = @valueFromP/S

<小时>

管道

| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription='')
| 'String to dictionary' >> beam.Map(lambda s:data_ingestion.parse_method(s))
| 'BigQuery' >> <Here is where I'm not sure how to do it>

从 BQ 读取的正常方式如下:

The normal way to read from BQ it would be like:

| 'Read' >> beam.io.Read(beam.io.BigQuerySource(
                query="SELECT field1 FROM table where field2='string'", use_standard_sql=True))

<小时>

我已了解参数化查询 但我不确定这是否适用于 apache beam.


I have read about parameterized queries but i'm not sure if this would work with apache beam.

可以使用辅助输入来完成吗?

It could be done using side inputs?

最好的方法是什么?

我尝试过的:

def parse_methodBQ(input):
    query=''SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])'
    return query


class ReadFromBigQuery(beam.PTransform):
    def expand(self, pcoll):
        return (
                pcoll
                | 'FormatQuery' >> beam.Map(parse_methodBQ)
                | 'Read' >> beam.Map(lambda s:  beam.io.Read(beam.io.BigQuerySource(query=s)))
        )

with beam.Pipeline(options=pipeline_options) as p:
transform = (p  | 'BQ' >> ReadFromBigQuery()

结果(为什么会这样?):

The result (why this?):

<Read(PTransform) label=[Read]>

正确的结果应该是:

{u'Field1': u'string', u'Field2': Bool}

<小时>

解决方案

正在筹备中:

| 'BQ' >> beam.Map(parse_method_BQ))

函数(将 BigQuery 0.25 API 用于数据流)

The function (using the BigQuery 0.25 API for dataflow)

def parse_method_BQ(input):
    client = bigquery.Client()
    QUERY = 'SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY ,job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
        query_job.reload()  # Refreshes the state via a GET request.
        if query_job.state == 'DONE':
            if query_job.error_result:
                raise RuntimeError(query_job.errors)
            rows = query_job.results().fetch_data()
            for row in rows:
                if not (row[0] is None):  
                    return input
        time.sleep(1)

推荐答案

您可以阅读整个表格 或使用 字符串查询.

我知道您将根据需要使用 parse_methodBQ 方法来自定义查询.由于此方法返回查询,您可以使用 BigQuerySource 调用它.行在字典中.

I understand that you will use the parse_methodBQ method to customize the query as needed. As this method returns a query, you can call it with BigQuerySource. The rows are in dictionary.

| 'QueryTable' >> beam.Map(beam.io.BigQuerySource(parse_methodBQ))
# Each row is a dictionary where the keys are the BigQuery columns
| 'Read' >> beam.Map(lambda s:  s['data'])

此外,您可以避免自定义查询并使用 过滤方法

Further more, you can avoid having to customize the query and use a filter method

关于辅助输入,查看 this 食谱中的示例,以便更好地了解如何使用它们.

Regarding the side inputs, review this example from the cookbook to have a better view on how to use them.

这篇关于Google Dataflow:在 Python 中使用 BigQuery+Pub/Sub 运行动态查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆