如何将 PCollection 转换为 python 数据流中的列表 [英] How I can convert PCollection to a list in python dataflow
问题描述
我有一个包含 ID 字段的 PCollection P1
.我想从 PCollection 中获取完整的 ID 列作为列表,并将此值传递给 BigQuery 查询以过滤一个 BigQuery 表.
I have a PCollection P1
that contains a field of ID's . I want to take the complete ID's column from the PCollection as a list and pass this value to a BigQuery query for filtering one BigQuery table.
这样做的最快和最优化的方法是什么?
What would be the fastest and most optimized way for doing this?
我是 Dataflow 和 BigData 的新手.任何人都可以对此提供一些提示吗?
I'm new to Dataflow and BigData. Can any one give some hints on this?
谢谢!
推荐答案
根据我从您的问题中了解到的,您希望根据 P1
中的 ID 构建 SQL 语句.这是您如何实现这一目标的一个示例:
For what I understood from your question you want to build the SQL statement given the IDs you have in P1
. This is one example of how you can achieve this:
sql = """select ID from `table` WHERE ID IN ({})"""
with beam.Pipeline(options=StandardOptions()) as p:
(p | 'Create' >> beam.Create(['1', '2', '3'])
| 'Combine' >> beam.combiners.ToList()
| 'Build SQL' >> beam.Map(lambda x: sql.format(','.join(map(lambda x: '"' + x + '"', x))))
| 'Save' >> beam.io.WriteToText('results.csv'))
结果:
select ID from `table` WHERE ID IN ("1","2","3")
操作 beam.combiners.ToList()
将您的整个 PCollection 数据转换为单个列表(我稍后使用该列表注入 SQL 占位符).
The operation beam.combiners.ToList()
transforms your whole PCollection data into a single list (which I used later on to inject in the SQL placeholder).
您现在可以使用文件 results.csv-00000-to-000001
中的 SQL 来针对 BQ 运行此查询.
You can now use the SQL in the file results.csv-00000-to-000001
to run this query against BQ.
我不确定是否可以直接在 PCollection 中运行此查询(类似于 (p | all transformations | beam.io.Write(beam.io.BigQuerySink(result sql)))
).我想从最终结果文件中读取然后针对 BQ 发出查询将是这里最好的方法.
I'm not sure if it's possible to run this query directly in the PCollection though (something like (p | all transformations | beam.io.Write(beam.io.BigQuerySink(result sql))
). I suppose reading from the end result file and then issuing the query against BQ would be the best approach here.
这篇关于如何将 PCollection 转换为 python 数据流中的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!