使用Apache Beam以CSV格式将BigQuery结果写入GCS [英] Write BigQuery results to GCS in CSV format using Apache Beam
问题描述
在Apache Beam上,我是一个非常新的工作,在这里,我试图编写一条管道来从Google BigQuery中提取数据,然后使用Python将数据以CSV格式写入GCS.
I am pretty new working on Apache Beam , where in I am trying to write a pipeline to extract the data from Google BigQuery and write the data to GCS in CSV format using Python.
使用beam.io.read(beam.io.BigQuerySource())
我能够从BigQuery读取数据,但不确定如何将其以CSV格式写入GCS.
Using beam.io.read(beam.io.BigQuerySource())
I am able to read the data from BigQuery but not sure how to write it to GCS in CSV format.
是否有实现相同功能的自定义功能,请您帮我吗?
Is there a custom function to achieve the same , could you please help me?
import logging
import apache_beam as beam
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given Destination big query table.
BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`', use_standard_sql=True))
# Extract data from Bigquery to GCS in CSV format.
# This is where I need your help
BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
table='tablename',
dataset='datasetname',
project='project_id',
schema='name:string,gender:string,count:integer',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
推荐答案
您可以使用数据集和以下查询:
You can do so using WriteToText
to add a .csv
suffix and headers
. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:
从`bigquery-public-data.samples.shakespeare`中选择单词,单词数,语料库,在CHAR_LENGTH(word)> 3的情况下,按单词数排序限制10
SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10
我们现在使用以下内容读取查询结果:
We now read the query results with:
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
BQ_DATA
现在包含键值对:
{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
我们可以应用beam.Map
函数仅产生值:
We can apply a beam.Map
function to yield only values:
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
BQ_VALUES
的摘录:
[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
最后再次映射,将所有列值用逗号而不是列表分开(请注意,如果双引号可以出现在字段中,则需要对双引号进行转义):
And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):
BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))
现在,我们使用后缀和标头将结果写入GCS:
Now we write the results to GCS with the suffix and headers:
BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')
书面结果:
$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
这篇关于使用Apache Beam以CSV格式将BigQuery结果写入GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!