我如何将PCollection转换为python数据流中的列表 [英] How I can convert PCollection to a list in python dataflow

查看:83
本文介绍了我如何将PCollection转换为python数据流中的列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个PCollection P1,其中包含ID的字段.我想将PCollection中完整的ID列作为列表,然后将此值传递给BigQuery查询以过滤一个BigQuery表.

I have a PCollection P1 that contains a field of ID's . I want to take the complete ID's column from the PCollection as a list and pass this value to a BigQuery query for filtering one BigQuery table.

最快,最优化的方法是什么?

What would be the fastest and most optimized way for doing this?

我是Dataflow和BigData的新手.有人可以对此提供一些提示吗?

I'm new to Dataflow and BigData. Can any one give some hints on this?

谢谢!

推荐答案

对于我从您的问题中了解到的信息,鉴于您在P1中拥有的ID,您想要构建SQL语句.这是如何实现此目的的一个示例:

For what I understood from your question you want to build the SQL statement given the IDs you have in P1. This is one example of how you can achieve this:

sql = """select ID from `table` WHERE ID IN ({})"""
with beam.Pipeline(options=StandardOptions()) as p:
         (p | 'Create' >> beam.Create(['1', '2', '3']) 
            | 'Combine' >> beam.combiners.ToList()
            | 'Build SQL' >> beam.Map(lambda x: sql.format(','.join(map(lambda x: '"' + x + '"', x))))
            | 'Save' >> beam.io.WriteToText('results.csv'))

结果:

select ID from `table` WHERE ID IN ("1","2","3")

操作beam.combiners.ToList()将您的整个PCollection数据转换为一个列表(稍后我将其用于插入SQL占位符).

The operation beam.combiners.ToList() transforms your whole PCollection data into a single list (which I used later on to inject in the SQL placeholder).

您现在可以使用文件results.csv-00000-to-000001中的SQL对BQ运行此查询.

You can now use the SQL in the file results.csv-00000-to-000001 to run this query against BQ.

我不确定是否可以直接在PCollection中运行此查询(类似于(p | all transformations | beam.io.Write(beam.io.BigQuerySink(result sql))).我想从最终结果文件中读取内容,然后对BQ发出查询将是最好的方法.

I'm not sure if it's possible to run this query directly in the PCollection though (something like (p | all transformations | beam.io.Write(beam.io.BigQuerySink(result sql)) ). I suppose reading from the end result file and then issuing the query against BQ would be the best approach here.

这篇关于我如何将PCollection转换为python数据流中的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆