使用Apache Beam以CSV格式将BigQuery结果写入GCS [英] Write BigQuery results to GCS in CSV format using Apache Beam

查看:68
本文介绍了使用Apache Beam以CSV格式将BigQuery结果写入GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Apache Beam上,我是一个非常新的工作,在这里,我试图编写一条管道来从Google BigQuery中提取数据,然后使用Python将数据以CSV格式写入GCS.

I am pretty new working on Apache Beam , where in I am trying to write a pipeline to extract the data from Google BigQuery and write the data to GCS in CSV format using Python.

使用beam.io.read(beam.io.BigQuerySource())我能够从BigQuery读取数据,但不确定如何将其以CSV格式写入GCS.

Using beam.io.read(beam.io.BigQuerySource()) I am able to read the data from BigQuery but not sure how to write it to GCS in CSV format.

是否有实现相同功能的自定义功能,请您帮我吗?

Is there a custom function to achieve the same , could you please help me?

import logging

import apache_beam as beam


PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
]

with beam.Pipeline(argv=argv) as p:

    # Execute the SQL in big query and store the result data set into given Destination big query table.
    BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
    # Extract data from Bigquery to GCS in CSV format.
    # This is where I need your help

    BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
            table='tablename',
            dataset='datasetname',
            project='project_id',
            schema='name:string,gender:string,count:integer',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()

推荐答案

您可以使用数据集和以下查询:

You can do so using WriteToText to add a .csv suffix and headers. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:

从`bigquery-public-data.samples.shakespeare`中选择单词,单词数,语料库,在CHAR_LENGTH(word)> 3的情况下,按单词数排序限制10

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

我们现在使用以下内容读取查询结果:

We now read the query results with:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA现在包含键值对:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

我们可以应用beam.Map函数仅产生值:

We can apply a beam.Map function to yield only values:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

BQ_VALUES的摘录:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

最后再次映射,将所有列值用逗号而不是列表分开(请注意,如果双引号可以出现在字段中,则需要对双引号进行转义):

And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

现在,我们使用后缀和标头将结果写入GCS:

Now we write the results to GCS with the suffix and headers:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

书面结果:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"

这篇关于使用Apache Beam以CSV格式将BigQuery结果写入GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆