如何将 PCollection 转换为 python 数据流中的列表 [英] How I can convert PCollection to a list in python dataflow

查看:18
本文介绍了如何将 PCollection 转换为 python 数据流中的列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含 ID 字段的 PCollection P1.我想从 PCollection 中获取完整的 ID 列作为列表,并将此值传递给 BigQuery 查询以过滤一个 BigQuery 表.

I have a PCollection P1 that contains a field of ID's . I want to take the complete ID's column from the PCollection as a list and pass this value to a BigQuery query for filtering one BigQuery table.

这样做的最快和最优化的方法是什么?

What would be the fastest and most optimized way for doing this?

我是 Dataflow 和 BigData 的新手.任何人都可以对此提供一些提示吗?

I'm new to Dataflow and BigData. Can any one give some hints on this?

谢谢!

推荐答案

根据我从您的问题中了解到的,您希望根据 P1 中的 ID 构建 SQL 语句.这是您如何实现这一目标的一个示例:

For what I understood from your question you want to build the SQL statement given the IDs you have in P1. This is one example of how you can achieve this:

sql = """select ID from `table` WHERE ID IN ({})"""
with beam.Pipeline(options=StandardOptions()) as p:
         (p | 'Create' >> beam.Create(['1', '2', '3']) 
            | 'Combine' >> beam.combiners.ToList()
            | 'Build SQL' >> beam.Map(lambda x: sql.format(','.join(map(lambda x: '"' + x + '"', x))))
            | 'Save' >> beam.io.WriteToText('results.csv'))

结果:

select ID from `table` WHERE ID IN ("1","2","3")

操作 beam.combiners.ToList() 将您的整个 PCollection 数据转换为单个列表(我稍后使用该列表注入 SQL 占位符).

The operation beam.combiners.ToList() transforms your whole PCollection data into a single list (which I used later on to inject in the SQL placeholder).

您现在可以使用文件 results.csv-00000-to-000001 中的 SQL 来针对 BQ 运行此查询.

You can now use the SQL in the file results.csv-00000-to-000001 to run this query against BQ.

我不确定是否可以直接在 PCollection 中运行此查询(类似于 (p | all tr​​ansformations | beam.io.Write(beam.io.BigQuerySink(result sql))) ).我想从最终结果文件中读取然后针对 BQ 发出查询将是这里最好的方法.

I'm not sure if it's possible to run this query directly in the PCollection though (something like (p | all transformations | beam.io.Write(beam.io.BigQuerySink(result sql)) ). I suppose reading from the end result file and then issuing the query against BQ would be the best approach here.

这篇关于如何将 PCollection 转换为 python 数据流中的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆