创建一个新表并使用Python在bigquery中设置截止日期 [英] Creating a new table and setting the expiration date in bigquery using Python
问题描述
这是我的代码,用于从Firebase中提取实时数据库,将其格式化为Json格式,然后上传到云端,然后上传到BQ。
#standardsql
导入json
导入boto
导入gcs_oauth2_boto_plugin
导入操作系统
导入shutil
导入StringIO
导入tempfile
导入时间
导入argparse
导入uuid
从firebase导入firebase
from google.cloud import storage
from google.cloud.storage import blob $ b $ from google.cloud import bigquery
firebase = firebase.FirebaseApplication('https:// dataworks ')
result = firebase.get('/ connection_info',None)
id_keys = map(str,result.keys())
打开(firetobq.json,w)为outfile:
为id_keys中的id:
json.dump(result [id],outfile,indent = None)
outfile。写(\ n)
client = storage.Client(project ='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json','rb')as f:
blo b.upload_from_file(f)
dataset ='dataworks-356fa'
source ='gs://dataworks-356fa-backups/firetobq.json'
def load_data_from_gcs(dataset,test12,source):
bigquery_client = bigquery.Client(dataset)
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table 'test12')
job_name = str(uuid.uuid4())
job1.create_disposition ='WRITE_TRUNCATE'
job1.begin()
job = bigquery_client。 load_table_from_storage(
job_name,table,gs://dataworks-356fa-backups/firetobq.json)
job.source_format ='NEWLINE_DELIMITED_JSON'
job.begin()
wait_for_job(job)
def wait_for_job(job):
,而True:
job.reload()
如果job.state =='DONE ':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
load _data_from_gcs(dataset,'test12',source)
如何更改此值而不是导入数据在表test12中创建一个新表,并让该表在一周后过期。 (我很确定用于设置过期日期的命令必须以秒为单位,1周= 604800秒)我知道如何通过命令行设置过期日期,但宁愿自动完成。
这是我在添加job1后收到的错误。
Traceback(最近的调用最后):
文件firebasetobq2.py,第63行,位于< module>
load_data_from_gcs(dataset,'test12',source)
文件firebasetobq2.py,第44行,位于load_data_from_gcs
job1.create_disposition ='WRITE_TRUNCATE'
NameError:全局名称' job1'未定义
如果您想设置您的表的过期时间,这可能会有诀窍:
from datetime import datetime,timedelta
from google.cloud .bigquery.schema import SchemaField
def load_data_from_gcs(dataset,
table_name,$ b $ table_schema,
source,
source_format,
expiration_time):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset)
table = dataset.table(table_name)
table.schema = table_schema
表。 expires = expiration_time
如果不是table.created:
选项卡le.create()
$ b $ job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name,table,source)
job.source_format = source_format
job.begin()
wait_for_job(job)
dataset ='FirebaseArchive'
table_name ='test12'
gcs_source ='gs://dataworks-356fa-backups/firetobq.json'
source_format ='NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1),SchemaField(field2),(...)]
expiration_time = datetime.now()+ timedelta(seconds = 604800)
load_data_from_gcs(dataset,
table_name,
table_schema,
gcs_source,
source_format,
expiration_time)
注意唯一的区别是它的代码行设置:
table.expires = expiration_time
其中va lue必须是 datetime
(这里定义为 expiration_time = datetime.now()+ timedelta(秒= 604800)
)
不确定是否可以使用Python API进行模式自动检测,但您仍然可以使用 SchemaFields $发送此信息C $ C>。例如,如果你的表有两个字段,
user_id
和 job_id
,都是 INTEGERS
,那么该模式将是:
$ b $ pre $ lt; code> table_schema = [SchemaField('user_id',field_type ='INT64 '),
SchemaField('job_id',field_type ='INT64')]
更多有关如何在BigQuery中使用模式的信息,您可以在这里找到 。
:
刚刚看到您的,如果你想截断表,然后写数据到它可以:
job.create_disposition ='WRITE_TRUNCATE'
job.begin()
在 load_data_from_gcs
函数中。这将自动删除表格并使用存储文件中的数据创建一个新表格。你不必为它定义一个模式,因为它之前已经定义过了(因此对你来说可能更容易)。
This is my code that pulls the realtime database from firebase, formats it in a Json, uploads to the cloud and then to BQ.
#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid
from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery
firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())
with open("firetobq.json", "w") as outfile:
for id in id_keys:
json.dump(result[id], outfile, indent=None)
outfile.write("\n")
client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
blob.upload_from_file(f)
dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'
def load_data_from_gcs(dataset, test12, source):
bigquery_client = bigquery.Client(dataset)
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table('test12')
job_name = str(uuid.uuid4())
job1.create_disposition = 'WRITE_TRUNCATE'
job1.begin()
job= bigquery_client.load_table_from_storage(
job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
job.source_format = 'NEWLINE_DELIMITED_JSON'
job.begin()
wait_for_job(job)
def wait_for_job(job):
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
load_data_from_gcs(dataset, 'test12', source)
How can I change this to instead of importing the data in table test12 to creating a new table and also having that table expire after 1 week. (Im pretty sure the command for setting the expiration date has to be in seconds. 1 week = 604800 seconds) I know how to set the expiration date via the command line but would rather have it done here automatically.
And this is the error I am receiving after adding job1.
Traceback (most recent call last):
File "firebasetobq2.py", line 63, in <module>
load_data_from_gcs(dataset, 'test12', source)
File "firebasetobq2.py", line 44, in load_data_from_gcs
job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined
If you want to set an expiration time for your table, this might do the trick:
from datetime import datetime, timedelta
from google.cloud.bigquery.schema import SchemaField
def load_data_from_gcs(dataset,
table_name,
table_schema,
source,
source_format,
expiration_time):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset)
table = dataset.table(table_name)
table.schema = table_schema
table.expires = expiration_time
if not table.created:
table.create()
job_name = str(uuid.uuid4())
job= bigquery_client.load_table_from_storage(
job_name, table, source)
job.source_format = source_format
job.begin()
wait_for_job(job)
dataset = 'FirebaseArchive'
table_name = 'test12'
gcs_source = 'gs://dataworks-356fa-backups/firetobq.json'
source_format = 'NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1), SchemaField(field2), (...)]
expiration_time = datetime.now() + timedelta(seconds=604800)
load_data_from_gcs(dataset,
table_name,
table_schema,
gcs_source,
source_format,
expiration_time)
Notice the only difference is the lines of code where it sets:
table.expires = expiration_time
Whose value must be of type datetime
(here defined as expiration_time = datetime.now() + timedelta(seconds=604800)
)
Not sure if it's possible to use schema auto-detection using the Python API but you still can send this information using the SchemaFields
. For instance, if your table have two fields, user_id
and job_id
, both being INTEGERS
, then the schema would be:
table_schema = [SchemaField('user_id', field_type='INT64'),
SchemaField('job_id', field_type='INT64')]
More information on how schema works in BigQuery you can find here.
[EDIT]:
Just saw your other question, if you want to truncate the table and then write data to it, you can just do:
job.create_disposition = 'WRITE_TRUNCATE'
job.begin()
In your load_data_from_gcs
function. This will automatically delete the table and create a new one with the data from your storage file. You won't have to define a schema for that as it's already previously defined (therefore might be a much easier solution for you).
这篇关于创建一个新表并使用Python在bigquery中设置截止日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!