使用 Apache Beam 将 BigQuery 结果以 CSV 格式写入 GCS [英] Write BigQuery results to GCS in CSV format using Apache Beam

查看:25
本文介绍了使用 Apache Beam 将 BigQuery 结果以 CSV 格式写入 GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Apache Beam 的新手,我正在尝试编写一个管道来从 Google BigQuery 中提取数据,并使用 Python 将数据以 CSV 格式写入 GCS.

I am pretty new working on Apache Beam , where in I am trying to write a pipeline to extract the data from Google BigQuery and write the data to GCS in CSV format using Python.

使用 beam.io.read(beam.io.BigQuerySource()) 我能够从 BigQuery 读取数据,但不确定如何以 CSV 格式将其写入 GCS.

Using beam.io.read(beam.io.BigQuerySource()) I am able to read the data from BigQuery but not sure how to write it to GCS in CSV format.

是否有自定义函数来实现相同的功能,您能帮帮我吗?

Is there a custom function to achieve the same , could you please help me?

import logging

import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE

PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Execute the SQL in big query and store the result data set into given Destination big query table.
        BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
            beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
        # Extract data from Bigquery to GCS in CSV format.
        # This is where I need your help

        BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
                table='tablename',
                dataset='datasetname',
                project='project_id',
                schema='name:string,gender:string,count:integer',
                create_disposition=CREATE_IF_NEEDED,
                write_disposition=WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()

推荐答案

您可以使用 WriteToText 添加 .csv 后缀和 headers.考虑到您需要将查询结果解析为 CSV 格式.例如,我使用了莎士比亚公共数据集 和以下查询:

You can do so using WriteToText to add a .csv suffix and headers. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

我们现在读取查询结果:

We now read the query results with:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA 现在包含键值对:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

我们可以应用 beam.Map 函数来只产生值:

We can apply a beam.Map function to yield only values:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

摘录BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

最后再次映射以用逗号而不是列表分隔所有列值(考虑到如果双引号可以出现在字段中,则需要转义双引号):

And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

现在我们使用后缀和标题将结果写入 GCS:

Now we write the results to GCS with the suffix and headers:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

书面结果:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"

这篇关于使用 Apache Beam 将 BigQuery 结果以 CSV 格式写入 GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆