Bigquery - 通过python将新数据行插入到表中 [英] Bigquery - Insert new data row into table by python

查看:149
本文介绍了Bigquery - 通过python将新数据行插入到表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读了很多关于google bigquery-python的文档,但我无法理解如何通过Python代码来管理bigquery数据。

首先,我做了一个新的

  credentials = GoogleCredentials.get_application_default()
service = build('bigquery','v2' ,credentials = credentials)

project_id ='my_project'
dataset_id ='my_dataset'
table_id ='my_table'

project_ref = {'projectId' :project_id}
dataset_ref = {'datasetId':dataset_id,
'projectId':project_id}
table_ref = {'tableId':table_id,
'datasetId':dataset_id,
'projectId':project_id}

dataset = {'datasetReference':dataset_ref}
table = {'tableReference':table_ref}
table ['schema'] = {'fields':[
{'name':'id','type':'string'},
...
]}

表= service.tables() nsert(body = table,** dataset_ref).execute()

然后我想插入一个数据到这个表中,所以我试图做下面的事情。

  fetch_list = [] 
patch = {'key ':'value'}
fetch_list.append(补丁)

table = service.tables()。patch(body = fetch_list,** table_ref).execute()

但是没有发生任何事情。

如何更新新数据进入bigquery表吗?



请给我看一些示例代码。

您可以使用几种不同的方法将数据插入到BQ中。



为了更深入地了解python-api的工作原理,您需要的一切: bq-python-api (起初文档有些可怕的,但你得到它的一个窍门,它是相当简单的)。



re是我用来向BQ插入数据的两种主要方法。第一个是数据流并且它应该在您可以实时插入行时使用。代码示例:

$ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $首先检查表是否已经存在。如果没有,则创建它
r = self.service.tables()。list(projectId = your_project_id,
datasetId = your_dataset_id).execute()
table_exists = [row ['
r ['tables'] if
row ['tableReference'] ['tableId'] == table]
如果不是table_exists:$ b,tableReference'] ['tableId'] $ b body = {
'tableReference':{
'tableId':table,
'projectId':your_project_id,
'datasetId':your_dataset_id
},
'schema':schema
}
self.service.tables()。insert(projectId = your_project_id,
datasetId = your_dataset_id,
body = body).execute( )

#创建表后,现在我们可以将数据流
#进行流式处理,以便我们使用tabled ata()。insertall()函数。
'row':[
{
'json':data,
'insertId':str(uuid.uuid4())
}
]
}
self.service.tabledata()。insertAll(projectId = your_project_id),
datasetId = your_dataset_id,
tableId = table,
body = body).execute(num_retries = 5)

这里我的 self .service 对应于您的 service 对象。



输入示例<$

  data = {u'days_validated'code> data  :'20',u'days_trained':'80',u'navigated_score':'1',u'description':'畅销书alg的第一次审判。没有过滤器,也没有任何条件适用。 Skus在列车计数中不存在= rank = 0.5','u'init_cv_date':'2016-03-06',u'metric':'rank',u'unix_date':'1461610020241117','u'purchased_score':'10 ',u'result':'0.32677139316724546',u'date':'2016-04-25',u'carted_score':'3',u'end_cv_date':'2016-03-25'} 

及其记者模式

  schema = {u'fields':[{u'type':u'STRING',u'name':u'date',u'mode ':u'NULLABLE'},{u'type':u'INTEGER',u'name':u'unix_date',u'mode':u'NULLABLE'},{u'type':u'STRING' ,u'name':u'init_cv_date',u'mode':u'NULLABLE'},{u'type':u'STRING',u'name':u'end_cv_date',u'mode':u' NULLABLE'},{u'type':u'INTEGER',u'name':u'days_trained',u'mode':u'NULLABLE'},{u'type':u'INTEGER',u'name ':u'days_validated',u'mode': u'NULLABLE'},{u'type':u'INTEGER',u'name':u'navigated_score',u'mode':u'NULLABLE'},{u'type':u'INTEGER',u 'name':u'carted_score',u'mode':u'NULLABLE'},{u'type':u'INTEGER',u'name':u'purchased_score',u'mode':u'NULLABLE' },{u'type':u'STRING',u'name':u'description',u'mode':u'NULLABLE'},{u'type':u'STRING',u'name': u'metric',u'mode':u'NULLABLE'},{u'type':u'FLOAT',u'name':u'result',u'mode':u'NULLABLE'}]} 

另一种插入数据的方式是使用作业插入功能。正如您在文档中看到的那样,它接受数据来源。我有一个例子,你可以通过将查询结果加载到另一个表中:
$ b $ pre $ def $ create_table_from_query(self,
查询,
dest_table,
如何):
body = {
'configuration':{
'query':{
'destinationTable ':{
'projectId':your_project_id,$ b $'tableId':dest_table,
'datasetId':your_dataset_id
},
'writeDisposition':how,
'query':query,
},
}
}

response = self.connector.jobs()。insert(projectId = self._project_id,
body = body).execute()
self.wait_job_completion(response ['jobReference'] ['jobId'])

def wait_job_comp letion(self,job_id):
而真:
response = self.connector.jobs()。get(projectId = self._project_id,$ b $ jobId = job_id).execute()
if response ['status'] ['state'] =='DONE':
return

如何输入在文档中接受此字段的可用选项(例如WRITE_TRUNCATE或WRITE_APPEND)。



例如,您可以从csv文件加载数据,在这种情况下,配置变量将被定义为: p>

 configuration:{
load:{
fieldDelimiter:\t
sourceFormat:CSV
destinationTable:{
projectId:your_project_id,
tableId:table_id,
datasetId:your_dataset_id
},
writeDisposition:WRITE_TRUNCATE
schema:schema,
so urceUris:file_location_in_google_cloud_storage
},
}



<使用csv作为示例文件由制表符分隔。它可能也是一个json文件,文档将引导你通过可用的选项)。

运行作业()需要一些时间才能完成(这就是为什么我们创建了 wait_job_completion 方法)。与实时流媒体相比,它应该更便宜。



有任何问题都会让我们知道,


I read many documents about google bigquery-python, but I can't understand how to manage bigquery data by python code.

At first, I make a new table as below.

credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
               'projectId': project_id}
table_ref = {'tableId': table_id,
             'datasetId': dataset_id,
             'projectId': project_id}

dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
    {'name': 'id', 'type': 'string'},
...
]}

table = service.tables().insert(body = table, **dataset_ref).execute()

And then I want to insert a data into this table, so I tried to do like below.

fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)

table = service.tables().patch(body = fetch_list, **table_ref).execute()

But nothing happened.

How can I update new data into bigquery table?

Please show me some example codes.

解决方案

There are a few different ways that you can use to insert data to BQ.

For a deeper understanding of how the python-api works, here's everything you'll need: bq-python-api (at first the docs are somewhat scary but after you get a hang of it it's rather quite simple).

There are 2 main methods that I use to insert data to BQ. The first one is data streaming and it's supposed to be used when you can insert row by row in a real time fashion. Code example:

import uuid
def stream_data(self, table, data, schema):
    # first checks if table already exists. If it doesn't, then create it
    r = self.service.tables().list(projectId=your_project_id,
                                     datasetId=your_dataset_id).execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if not table_exists:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': your_project_id,
                'datasetId': your_dataset_id
            },
            'schema': schema
        }
        self.service.tables().insert(projectId=your_project_id,
                                     datasetId=your_dataset_id,
                                     body=body).execute()

    # with table created, now we can stream the data
    # to do so we'll use the tabledata().insertall() function.
    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.service.tabledata().insertAll(projectId=your_project_id),
                                       datasetId=your_dataset_id,
                                       tableId=table,
                                         body=body).execute(num_retries=5)

Here my self.service is correspondent to your service object.

An example of input data that we have in our project:

data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}

And its correspondent schema:

schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}

The other way to insert data is to use the job insert function. As you can see in the documentation, it accepts several sources for your data. I have an example of how you can do so by loading the results of a query into another table:

def create_table_from_query(self,
                            query,
                            dest_table,
                            how):
    body = {
        'configuration': {
            'query': {
                'destinationTable': {
                    'projectId': your_project_id,
                    'tableId': dest_table,
                    'datasetId': your_dataset_id
                },
                'writeDisposition': how,
                'query': query,
            },
        }
    }

    response = self.connector.jobs().insert(projectId=self._project_id,
                                            body=body).execute()
    self.wait_job_completion(response['jobReference']['jobId'])

def wait_job_completion(self, job_id):
    while True:
        response = self.connector.jobs().get(projectId=self._project_id,
                                             jobId=job_id).execute()
        if response['status']['state'] == 'DONE':
            return

The how input accepts the available options for this field in the documentation (such as "WRITE_TRUNCATE", or "WRITE_APPEND").

You can load the data from a csv file for instance, in this case, the configuration variable would be defined something along the lines:

"configuration": {
  "load": {
    "fieldDelimiter": "\t"
    "sourceFormat": "CSV"
    "destinationTable": {
      "projectId": your_project_id,
      "tableId": table_id,
      "datasetId": your_dataset_id
    },
    "writeDisposition": "WRITE_TRUNCATE"
    "schema": schema,
    "sourceUris": file_location_in_google_cloud_storage
  },
}

(Using as example a csv file delimited by tabs. It could be a json file as well, the documentation will walk you through the available options).

Running jobs() require some time for it to complete (that's why we created the wait_job_completion method). It should be cheaper though as compared to real time streaming.

Any questions let us know,

这篇关于Bigquery - 通过python将新数据行插入到表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆