加载到BigQuery时如何向CSV添加带有元数据值的新列 [英] How to add new column with metadata value to csv when loading it to bigquery
问题描述
我在Google存储设备上的存储桶中有一个每日的csv文件,我建立了一个加载该csv并将其添加到BigQuery中的表中的函数.但是,我想在csv中添加一个新列在将数据加载到Big查询之前,使用函数执行ID(context ["id"])进行操作.
I have a daily csv file coming into my bucket on google storage and I built a function that load this csv and append it into a table in BigQuery when it comes in. However, I want to add a new column to the csv with the function execution id (context["id"]) before I load the data to Big query.
有可能吗?
提前谢谢!
def TimeTableToBigQuery(data, context):
# Getting metadata about the uploaded file, the storage and datetime of insert
excution_id = context['event_id']
bucketname = data['bucket']
filename = data['name']
timeCreated = data['timeCreated']
pathtofile = data["id"].rsplit("/", 2)
# parent_folder = data["id"].rsplit("/", 3)
file = str(pathtofile[1])
name = file.split('---')
dates = name[0].split('_', 1)
arrivedat = str(dates[1])
path = pathtofile[0]
# parent_folder = parent_folder[1]
# work start here to get the data into the table we establish a job before we send this job to load :)
client = bigquery.Client()
dataset_id = 'nature_bi'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_config.field_delimiter = ';',
job_config.allow_jagged_rows = True
job_config.allow_quoted_newlines = True
job_config.write_disposition = 'WRITE_TRUNCATE',
job_config.source_format = bigquery.SourceFormat.CSV
job_config.schema = [
bigquery.SchemaField('Anstallningsnummer', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Kod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Kostnadsstalle', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Tidkod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('OB_tidkod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Dagsschema', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Schemalagd_arbetstid', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Summa_narvaro', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Summa_franvaro', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum_for_klarmarkering', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum_for_attestering', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Frislappsdatum', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Export_klockslag', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Vecka', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('RowHashKey', 'STRING', mode='NULLABLE', description='to be written in BQ'),
bigquery.SchemaField('MergeState', 'INTEGER', mode='NULLABLE', description='for merging data in BQ'),
bigquery.SchemaField('SourceName', 'STRING', mode='NULLABLE', description='Path to file'),
bigquery.SchemaField('SourceScript', 'STRING', mode='NULLABLE', description='Path to file'),
bigquery.SchemaField('ArriveDateTime', 'STRING', mode='NULLABLE', description='Path to file'),
bigquery.SchemaField('InsertDateTime', 'STRING', mode='NULLABLE', description='Path to file'),
bigquery.SchemaField('ExecutionID', 'STRING', mode='NULLABLE', description='Path to file')
]
uri = 'gs://%s/%s' % (bucketname, filename)
print('Received file "%s" at %s.' % (
uri,
timeCreated
))
tablename = 'employee_time'
table_id = dataset_ref.table(tablename) # table_id = "its value was in load_job="
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + data['bucket'] + '/' + data['name']
# lets do this and send our job that we configured before to load to BQ
load_job = client.load_table_from_uri(
uri,
table_id,
job_config=job_config)
# Here we print some information in the log to track our work
print('Starting job with ID {}'.format(load_job.job_id))
print('File: {}'.format(data['name']))
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(tablename))
print('Loaded {} rows.'.format(destination_table.num_rows))
推荐答案
您有3种方法可以实现这一目标
You have 3 ways to achieve this
- 重写文件
- 逐行读取文件
- 在每一行上,添加所需的字段
- 写入本地文件(/tmp目录可用,它在内存中,并且受限于函数内存的大小).
- 然后将此文件加载到您的表中
- 按原样在临时表中加载文件
- 等待加载作业结束
- 运行类似这样的查询
插入< your table>选择*,CURRENT_TIMESTAMP()AS InsertDateTim,< your executionId>AS ExecutionId FROM< temp table>
. - 然后删除临时表(或在删除表为1天的数据集中创建临时表).但是,请注意,该功能最多可以使用9分钟,如果您的文件很大,则在一个功能中执行所有这些操作可能会花费一些时间.您可以构建更复杂的内容(如果需要,我可以详细说明).此外,您查询所有临时数据以将其下沉到最终表中,如果您有大量数据,这可能会产生成本.
如果执行WRITE_TRUNCATE(如代码示例所示),则可以执行更智能的事情.
if you perform a WRITE_TRUNCATE (as in your code example), you can perform a smarter things.
- 删除先前的现有表
- 将文件加载到具有这样的表名的表中,例如
nature_bi_< insertDate> __< executionId>
- 查询时,将表名注入查询结果中(在这里,我简单地添加表名,但使用原生BigQuery函数,您可以提取日期和执行ID )
- Delete the previous existing table
- Load the file in a table with a table name like this
nature_bi_<insertDate>_<executionId>
- When you query, inject the table name into your query result (here I simple add the table name, but with UDF or native BigQuery function, you can extract date and executionId)
SELECT *,(SELECT table_id FROM `<project>.<dataset>.__TABLES_SUMMARY__` WHERE table_id LIKE 'nature_bi%') FROM `<project>.<dataset>.nature_bi*` LIMIT 1000
所有解决方案均有效,取决于您的禁忌素和文件大小
All solutions are valid, depends of your contrains and file size
这篇关于加载到BigQuery时如何向CSV添加带有元数据值的新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!