创建一个新表并使用Python在bigquery中设置截止日期 [英] Creating a new table and setting the expiration date in bigquery using Python

查看:147
本文介绍了创建一个新表并使用Python在bigquery中设置截止日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的代码,用于从Firebase中提取实时数据库,将其格式化为Json格式,然后上传到云端,然后上传到BQ。

  #standardsql 
导入json
导入boto
导入gcs_oauth2_boto_plugin
导入操作系统
导入shutil
导入StringIO
导入tempfile
导入时间
导入argparse
导入uuid

从firebase导入firebase
from google.cloud import storage
from google.cloud.storage import blob $ b $ from google.cloud import bigquery

firebase = firebase.FirebaseApplication('https:// dataworks ')
result = firebase.get('/ connection_info',None)
id_keys = map(str,result.keys())

打开(firetobq.json,w)为outfile:
为id_keys中的id:
json.dump(result [id],outfile,indent = None)
outfile。写(\ n)

client = storage.Client(project ='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json','rb')as f:
blo b.upload_from_file(f)

dataset ='dataworks-356fa'
source ='gs://dataworks-356fa-backups/firetobq.json'


def load_data_from_gcs(dataset,test12,source):
bigquery_client = bigquery.Client(dataset)
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table 'test12')
job_name = str(uuid.uuid4())
job1.create_disposition ='WRITE_TRUNCATE'
job1.begin()

job = bigquery_client。 load_table_from_storage(
job_name,table,gs://dataworks-356fa-backups/firetobq.json)
job.source_format ='NEWLINE_DELIMITED_JSON'

job.begin()
wait_for_job(job)

def wait_for_job(job):
,而True:
job.reload()
如果job.state =='DONE ':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)

load _data_from_gcs(dataset,'test12',source)

如何更改此值而不是导入数据在表test12中创建一个新表,并让该表在一周后过期。 (我很确定用于设置过期日期的命令必须以秒为单位,1周= 604800秒)我知道如何通过命令行设置过期日期,但宁愿自动完成。



这是我在添加job1后收到的错误。

  Traceback(最近的调用最后):
文件firebasetobq2.py,第63行,位于< module>
load_data_from_gcs(dataset,'test12',source)
文件firebasetobq2.py,第44行,位于load_data_from_gcs
job1.create_disposition ='WRITE_TRUNCATE'
NameError:全局名称' job1'未定义


解决方案

如果您想设置您的表的过期时间,这可能会有诀窍:

  from datetime import datetime,timedelta 
from google.cloud .bigquery.schema import SchemaField

def load_data_from_gcs(dataset,
table_name,$ b $ table_schema,
source,
source_format,
expiration_time):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset)
table = dataset.table(table_name)
table.schema = table_schema
表。 expires = expiration_time
如果不是table.created:
选项卡le.create()
$ b $ job_name = str(uuid.uuid4())
job = bigquery_client.load_table_from_storage(
job_name,table,source)
job.source_format = source_format

job.begin()
wait_for_job(job)

dataset ='FirebaseArchive'
table_name ='test12'
gcs_source ='gs://dataworks-356fa-backups/firetobq.json'
source_format ='NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1),SchemaField(field2),(...)]
expiration_time = datetime.now()+ timedelta(seconds = 604800)

load_data_from_gcs(dataset,
table_name,
table_schema,
gcs_source,
source_format,
expiration_time)

注意唯一的区别是它的代码行设置:

  table.expires = expiration_time 

其中va lue必须是 datetime (这里定义为 expiration_time = datetime.now()+ timedelta(秒= 604800)



不确定是否可以使用Python API进行模式自动检测,但您仍然可以使用 SchemaFields 。例如,如果你的表有两个字段, user_id job_id ,都是 INTEGERS ,那么该模式将是:
$ b $ pre $ lt; code> table_schema = [SchemaField('user_id',field_type ='INT64 '),
SchemaField('job_id',field_type ='INT64')]

更多有关如何在BigQuery中使用模式的信息,您可以在这里找到



刚刚看到您的,如果你想截断表,然后写数据到它可以:

  job.create_disposition ='WRITE_TRUNCATE'
job.begin()

load_data_from_gcs 函数中。这将自动删除表格并使用存储文件中的数据创建一个新表格。你不必为它定义一个模式,因为它之前已经定义过了(因此对你来说可能更容易)。

This is my code that pulls the realtime database from firebase, formats it in a Json, uploads to the cloud and then to BQ.

#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid

from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery

firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())

with open("firetobq.json", "w") as outfile:
  for id in id_keys:
    json.dump(result[id], outfile, indent=None)
    outfile.write("\n")

client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
  blob.upload_from_file(f)

dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'


def load_data_from_gcs(dataset, test12, source):
    bigquery_client = bigquery.Client(dataset)
    dataset = bigquery_client.dataset('FirebaseArchive')
    table = dataset.table('test12')
    job_name = str(uuid.uuid4())
    job1.create_disposition = 'WRITE_TRUNCATE'
    job1.begin()

    job= bigquery_client.load_table_from_storage(
        job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
    job.source_format = 'NEWLINE_DELIMITED_JSON'

    job.begin()
    wait_for_job(job)

def wait_for_job(job):
    while True:
        job.reload()
        if job.state == 'DONE':
            if job.error_result:
                raise RuntimeError(job.errors)
            return
        time.sleep(1)

load_data_from_gcs(dataset, 'test12', source)

How can I change this to instead of importing the data in table test12 to creating a new table and also having that table expire after 1 week. (Im pretty sure the command for setting the expiration date has to be in seconds. 1 week = 604800 seconds) I know how to set the expiration date via the command line but would rather have it done here automatically.

And this is the error I am receiving after adding job1.

Traceback (most recent call last):
  File "firebasetobq2.py", line 63, in <module>
    load_data_from_gcs(dataset, 'test12', source)
  File "firebasetobq2.py", line 44, in load_data_from_gcs
    job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined

解决方案

If you want to set an expiration time for your table, this might do the trick:

from datetime import datetime, timedelta
from google.cloud.bigquery.schema import SchemaField

def load_data_from_gcs(dataset,
                   table_name,
                   table_schema,
                   source,
                   source_format,
                   expiration_time):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset)
    table = dataset.table(table_name)
    table.schema = table_schema
    table.expires = expiration_time
    if not table.created:
        table.create()

    job_name = str(uuid.uuid4())
    job= bigquery_client.load_table_from_storage(
        job_name, table, source)
    job.source_format = source_format

    job.begin()
    wait_for_job(job)

dataset = 'FirebaseArchive'
table_name = 'test12'
gcs_source = 'gs://dataworks-356fa-backups/firetobq.json'
source_format = 'NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1), SchemaField(field2), (...)]
expiration_time = datetime.now() + timedelta(seconds=604800)

load_data_from_gcs(dataset,
                   table_name,
                   table_schema,
                   gcs_source,
                   source_format,
                   expiration_time)

Notice the only difference is the lines of code where it sets:

table.expires = expiration_time

Whose value must be of type datetime (here defined as expiration_time = datetime.now() + timedelta(seconds=604800))

Not sure if it's possible to use schema auto-detection using the Python API but you still can send this information using the SchemaFields. For instance, if your table have two fields, user_id and job_id, both being INTEGERS, then the schema would be:

table_schema = [SchemaField('user_id', field_type='INT64'),
                SchemaField('job_id', field_type='INT64')]

More information on how schema works in BigQuery you can find here.

[EDIT]:

Just saw your other question, if you want to truncate the table and then write data to it, you can just do:

job.create_disposition = 'WRITE_TRUNCATE'
job.begin()

In your load_data_from_gcs function. This will automatically delete the table and create a new one with the data from your storage file. You won't have to define a schema for that as it's already previously defined (therefore might be a much easier solution for you).

这篇关于创建一个新表并使用Python在bigquery中设置截止日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆