Bigquery - 通过python将新数据行插入到表中 [英] Bigquery - Insert new data row into table by python
问题描述
我读了很多关于google bigquery-python的文档,但我无法理解如何通过Python代码来管理bigquery数据。
首先,我做了一个新的 credentials = GoogleCredentials.get_application_default()
service = build('bigquery','v2' ,credentials = credentials)
project_id ='my_project'
dataset_id ='my_dataset'
table_id ='my_table'
project_ref = {'projectId' :project_id}
dataset_ref = {'datasetId':dataset_id,
'projectId':project_id}
table_ref = {'tableId':table_id,
'datasetId':dataset_id,
'projectId':project_id}
dataset = {'datasetReference':dataset_ref}
table = {'tableReference':table_ref}
table ['schema'] = {'fields':[
{'name':'id','type':'string'},
...
]}
表= service.tables() nsert(body = table,** dataset_ref).execute()
然后我想插入一个数据到这个表中,所以我试图做下面的事情。
fetch_list = []
patch = {'key ':'value'}
fetch_list.append(补丁)
table = service.tables()。patch(body = fetch_list,** table_ref).execute()
但是没有发生任何事情。
如何更新新数据进入bigquery表吗?
请给我看一些示例代码。
您可以使用几种不同的方法将数据插入到BQ中。
为了更深入地了解python-api的工作原理,您需要的一切: bq-python-api (起初文档有些可怕的,但你得到它的一个窍门,它是相当简单的)。
re是我用来向BQ插入数据的两种主要方法。第一个是数据流并且它应该在您可以实时插入行时使用。代码示例:
$ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $首先检查表是否已经存在。如果没有,则创建它
r = self.service.tables()。list(projectId = your_project_id,
datasetId = your_dataset_id).execute()
table_exists = [row ['
r ['tables'] if
row ['tableReference'] ['tableId'] == table]
如果不是table_exists:$ b,tableReference'] ['tableId'] $ b body = {
'tableReference':{
'tableId':table,
'projectId':your_project_id,
'datasetId':your_dataset_id
},
'schema':schema
}
self.service.tables()。insert(projectId = your_project_id,
datasetId = your_dataset_id,
body = body).execute( )
#创建表后,现在我们可以将数据流
#进行流式处理,以便我们使用tabled ata()。insertall()函数。
'row':[
{
'json':data,
'insertId':str(uuid.uuid4())
}
]
}
self.service.tabledata()。insertAll(projectId = your_project_id),
datasetId = your_dataset_id,
tableId = table,
body = body).execute(num_retries = 5)
这里我的 self .service
对应于您的 service
对象。
输入示例<$
data = {u'days_validated'code> data
:'20',u'days_trained':'80',u'navigated_score':'1',u'description':'畅销书alg的第一次审判。没有过滤器,也没有任何条件适用。 Skus在列车计数中不存在= rank = 0.5','u'init_cv_date':'2016-03-06',u'metric':'rank',u'unix_date':'1461610020241117','u'purchased_score':'10 ',u'result':'0.32677139316724546',u'date':'2016-04-25',u'carted_score':'3',u'end_cv_date':'2016-03-25'}
及其记者模式
:
schema = {u'fields':[{u'type':u'STRING',u'name':u'date',u'mode ':u'NULLABLE'},{u'type':u'INTEGER',u'name':u'unix_date',u'mode':u'NULLABLE'},{u'type':u'STRING' ,u'name':u'init_cv_date',u'mode':u'NULLABLE'},{u'type':u'STRING',u'name':u'end_cv_date',u'mode':u' NULLABLE'},{u'type':u'INTEGER',u'name':u'days_trained',u'mode':u'NULLABLE'},{u'type':u'INTEGER',u'name ':u'days_validated',u'mode': u'NULLABLE'},{u'type':u'INTEGER',u'name':u'navigated_score',u'mode':u'NULLABLE'},{u'type':u'INTEGER',u 'name':u'carted_score',u'mode':u'NULLABLE'},{u'type':u'INTEGER',u'name':u'purchased_score',u'mode':u'NULLABLE' },{u'type':u'STRING',u'name':u'description',u'mode':u'NULLABLE'},{u'type':u'STRING',u'name': u'metric',u'mode':u'NULLABLE'},{u'type':u'FLOAT',u'name':u'result',u'mode':u'NULLABLE'}]}
另一种插入数据的方式是使用作业插入功能。正如您在文档中看到的那样,它接受数据来源。我有一个例子,你可以通过将查询结果加载到另一个表中:
$ b $ pre $ def $ create_table_from_query(self,
查询,
dest_table,
如何):
body = {
'configuration':{
'query':{
'destinationTable ':{
'projectId':your_project_id,$ b $'tableId':dest_table,
'datasetId':your_dataset_id
},
'writeDisposition':how,
'query':query,
},
}
}
response = self.connector.jobs()。insert(projectId = self._project_id,
body = body).execute()
self.wait_job_completion(response ['jobReference'] ['jobId'])
def wait_job_comp letion(self,job_id):
而真:
response = self.connector.jobs()。get(projectId = self._project_id,$ b $ jobId = job_id).execute()
if response ['status'] ['state'] =='DONE':
return
如何
输入在文档中接受此字段的可用选项(例如WRITE_TRUNCATE或WRITE_APPEND)。
例如,您可以从csv文件加载数据,在这种情况下,配置
变量将被定义为: p>
configuration:{
load:{
fieldDelimiter:\t
sourceFormat:CSV
destinationTable:{
projectId:your_project_id,
tableId:table_id,
datasetId:your_dataset_id
},
writeDisposition:WRITE_TRUNCATE
schema:schema,
so urceUris:file_location_in_google_cloud_storage
},
}
<使用csv作为示例文件由制表符分隔。它可能也是一个json文件,文档将引导你通过可用的选项)。
运行作业()需要一些时间才能完成(这就是为什么我们创建了 wait_job_completion
方法)。与实时流媒体相比,它应该更便宜。
有任何问题都会让我们知道,
I read many documents about google bigquery-python, but I can't understand how to manage bigquery data by python code.
At first, I make a new table as below.
credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
'projectId': project_id}
table_ref = {'tableId': table_id,
'datasetId': dataset_id,
'projectId': project_id}
dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
{'name': 'id', 'type': 'string'},
...
]}
table = service.tables().insert(body = table, **dataset_ref).execute()
And then I want to insert a data into this table, so I tried to do like below.
fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)
table = service.tables().patch(body = fetch_list, **table_ref).execute()
But nothing happened.
How can I update new data into bigquery table?
Please show me some example codes.
There are a few different ways that you can use to insert data to BQ.
For a deeper understanding of how the python-api works, here's everything you'll need: bq-python-api (at first the docs are somewhat scary but after you get a hang of it it's rather quite simple).
There are 2 main methods that I use to insert data to BQ. The first one is data streaming and it's supposed to be used when you can insert row by row in a real time fashion. Code example:
import uuid
def stream_data(self, table, data, schema):
# first checks if table already exists. If it doesn't, then create it
r = self.service.tables().list(projectId=your_project_id,
datasetId=your_dataset_id).execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if not table_exists:
body = {
'tableReference': {
'tableId': table,
'projectId': your_project_id,
'datasetId': your_dataset_id
},
'schema': schema
}
self.service.tables().insert(projectId=your_project_id,
datasetId=your_dataset_id,
body=body).execute()
# with table created, now we can stream the data
# to do so we'll use the tabledata().insertall() function.
body = {
'rows': [
{
'json': data,
'insertId': str(uuid.uuid4())
}
]
}
self.service.tabledata().insertAll(projectId=your_project_id),
datasetId=your_dataset_id,
tableId=table,
body=body).execute(num_retries=5)
Here my self.service
is correspondent to your service
object.
An example of input data
that we have in our project:
data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}
And its correspondent schema
:
schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}
The other way to insert data is to use the job insert function. As you can see in the documentation, it accepts several sources for your data. I have an example of how you can do so by loading the results of a query into another table:
def create_table_from_query(self,
query,
dest_table,
how):
body = {
'configuration': {
'query': {
'destinationTable': {
'projectId': your_project_id,
'tableId': dest_table,
'datasetId': your_dataset_id
},
'writeDisposition': how,
'query': query,
},
}
}
response = self.connector.jobs().insert(projectId=self._project_id,
body=body).execute()
self.wait_job_completion(response['jobReference']['jobId'])
def wait_job_completion(self, job_id):
while True:
response = self.connector.jobs().get(projectId=self._project_id,
jobId=job_id).execute()
if response['status']['state'] == 'DONE':
return
The how
input accepts the available options for this field in the documentation (such as "WRITE_TRUNCATE", or "WRITE_APPEND").
You can load the data from a csv file for instance, in this case, the configuration
variable would be defined something along the lines:
"configuration": {
"load": {
"fieldDelimiter": "\t"
"sourceFormat": "CSV"
"destinationTable": {
"projectId": your_project_id,
"tableId": table_id,
"datasetId": your_dataset_id
},
"writeDisposition": "WRITE_TRUNCATE"
"schema": schema,
"sourceUris": file_location_in_google_cloud_storage
},
}
(Using as example a csv file delimited by tabs. It could be a json file as well, the documentation will walk you through the available options).
Running jobs() require some time for it to complete (that's why we created the wait_job_completion
method). It should be cheaper though as compared to real time streaming.
Any questions let us know,
这篇关于Bigquery - 通过python将新数据行插入到表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!