Google Dataflow:在Python中使用BigQuery + Pub/Sub运行动态查询 [英] Google Dataflow: running dynamic query with BigQuery+Pub/Sub in Python
问题描述
我想在管道中做什么:
- 从pub/sub中读取(完成)
- 将此数据转换为字典(完成)
- 从字典中获取指定键的值(完成)
-
从BigQuery运行参数化/动态查询,其中where部分应如下所示:
- Read from pub/sub (done)
- Transform this data to dictionary (done)
- Take the value of a specified key from the dict (done)
Run a parametrized/dynamic query from BigQuery in which the where part should be like this:
SELECT field1 FROM Table where field2 = @valueFromP/S
管道
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription='')
| 'String to dictionary' >> beam.Map(lambda s:data_ingestion.parse_method(s))
| 'BigQuery' >> <Here is where I'm not sure how to do it>
从BQ读取数据的正常方法是:
The normal way to read from BQ it would be like:
| 'Read' >> beam.io.Read(beam.io.BigQuerySource(
query="SELECT field1 FROM table where field2='string'", use_standard_sql=True))
我已经阅读了有关参数化的查询但我不确定这是否适用于Apache Beam.
I have read about parameterized queries but i'm not sure if this would work with apache beam.
可以使用侧面输入来完成吗?
It could be done using side inputs?
哪种方法是最好的方法?
Which would be the best way to do this?
我尝试过的事情:
def parse_methodBQ(input):
query=''SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])'
return query
class ReadFromBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'FormatQuery' >> beam.Map(parse_methodBQ)
| 'Read' >> beam.Map(lambda s: beam.io.Read(beam.io.BigQuerySource(query=s)))
)
with beam.Pipeline(options=pipeline_options) as p:
transform = (p | 'BQ' >> ReadFromBigQuery()
结果(为什么?):
<Read(PTransform) label=[Read]>
正确的结果应该是:
{u'Field1': u'string', u'Field2': Bool}
解决方案
在管道中:
| 'BQ' >> beam.Map(parse_method_BQ))
函数(使用BigQuery 0.25 API进行数据流)
The function (using the BigQuery 0.25 API for dataflow)
def parse_method_BQ(input):
client = bigquery.Client()
QUERY = 'SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY ,job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
while True:
query_job.reload() # Refreshes the state via a GET request.
if query_job.state == 'DONE':
if query_job.error_result:
raise RuntimeError(query_job.errors)
rows = query_job.results().fetch_data()
for row in rows:
if not (row[0] is None):
return input
time.sleep(1)
推荐答案
您可以字符串查询.
我了解您将根据需要使用parse_methodBQ方法来自定义查询.当此方法返回查询时,您可以使用BigQuerySource进行调用.这些行在字典中.
I understand that you will use the parse_methodBQ method to customize the query as needed. As this method returns a query, you can call it with BigQuerySource. The rows are in dictionary.
| 'QueryTable' >> beam.Map(beam.io.BigQuerySource(parse_methodBQ))
# Each row is a dictionary where the keys are the BigQuery columns
| 'Read' >> beam.Map(lambda s: s['data'])
Further more, you can avoid having to customize the query and use a filter method
Regarding the side inputs, review this example from the cookbook to have a better view on how to use them.
这篇关于Google Dataflow:在Python中使用BigQuery + Pub/Sub运行动态查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!