流式传输之前BigQuery表格截断不起作用 [英] BigQuery table truncation before streaming not working

查看:77
本文介绍了流式传输之前BigQuery表格截断不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用BigQuery Python API来运行一些分析。为此,我们创建了以下适配器:

  def stream_data(self,table,data,schema,how = None): 
r = self.connector.tables()。list(projectId = self._project_id,
datasetId ='lbanor')。execute()
table_exists = [row ['tableReference'] [''如果
row ['tableReference'] ['tableId'] == table]
if table_exists:
if if == tableId'] for row in
r ['tables'] 'WRITE_TRUNCATE':
self.connector.tables()。delete(projectId = self._project_id,
datasetId ='lbanor',
tableId = table).execute()
body = {
'tableReference':{
'tableId':table,
'projectId':self._project_id,
'datasetId':'lbanor'
},
'schema':schema
}
self.connector.tables()。insert(projectId =(
self._project_id),
datasetId ='lban ',
body = body).execute()
else:
body = {
'tableReference':{
'tableId':table,
'projectId':self._project_id,
'datasetId':'lbanor'
},
'schema':schema
}
self.connector.tables() .insert(projectId =(
self._project_id),
datasetId ='lbanor',
body = body).execute()

body = {
'rows':[
{
'json' :data,
'insertId':str(uuid.uuid4())
}
]
}
self.connector.tabledata()。insertAll(projectId = (
self._project_id),
datasetId ='lbanor',
tableId = table,
body = body).execute(num_retries = 5)

其中连接器只是构建对象。



它的主要目的是将数据传输到给定的表格。如果表已经存在并且how输入作为WRITE_TRUNCATE传递,那么该表首先被删除并重新创建。
之后,继续进行数据流处理。



当表格不被一遍又一遍地删除时,一切正常。



例如,当我们运行脚本而不模拟写截断选项(> > >)继续调用 stream_data with how = None ):

  [
{
date:2016-04-25,
unix_date:1461606664981207,
init_cv_date:2016-03-12 ,
end_cv_date:2016-03-25,
days_trained:56,
days_validated:14,
navigated_score: 1,
carted_score:3,
purchased_score:10,
description:畅销书的第一次试用,不适用任何过滤器或条件。在列车计数中,Skus不存在,因为rank = 0.5,
metric:rank,
result:0.31729249914663893
},
{
date:2016-04-25,
unix_date:14616065997451 07,
init_cv_date:2016-03-06,
end_cv_date:2016-03-25,
days_trained:80,
days_validated:20,
navigated_score:1,
carted_score:3,
purchased_score:10,
描述:畅销书alg。没有过滤器,也没有任何条件适用。 Skus不在火车计数中作为等级= 0.5,
metric:等级,
结果:0.32677143128667446
},
{
date:2016-04-25,
unix_date:1461606688950415,
init_cv_date:2016-03-14,
end_cv_date:2016 -03-25,
days_trained:48,
days_validated:12,
navigated_score:1,
carted_score: 3,
purchased_score:10,
description:畅销书alg。没有过滤器,也没有任何条件适用。 Skus不在列车计数中,因为排名= 0.5,
metric:rank,
result:0.3129267723358932
},
{
date:2016-04-25,
unix_date:1461606707195122,
init_cv_date:2016-03-16,
end_cv_date:2016 -03-25,
days_trained:40,
days_validated:10,
navigated_score:1,
carted_score: 3,
purchased_score:10,
description:畅销书alg。没有过滤器,也没有任何条件适用。 Skus在列车计数中不存在等级= 0.5,
公制:等级,
结果:0.310620987663015
},
{
date:2016-04-25,
unix_date:1461606622432947,
init_cv_date:2016-03-08,
end_cv_date:2016 -03-25,
days_trained:72,
days_validated:18,
navigated_score:1,
carted_score: 3,
purchased_score:10,
description:畅销书alg。没有过滤器,也没有任何条件适用。 Skur在列车计数中不存在等级= 0.5,
metric:等级,
结果:0.32395802949369296
}
]

但是,当我们使用同一个适配器并且输入how =WRITE_TRUNCATE时,它的行为发生了变化,变得无法预测。 >

有时它可以工作并且数据会保存到表中,但有时候,即使没有错误发生,也不会将数据保存到表中。



试图查询表时,没有返回数据,它只返回查询返回的零结果。

删除表时出错,再次创建并传输数据,我们是否犯了一些错误?



如果您需要更多信息,请让我知道。 $ b

解决方案

请参阅Jordan Tigani的答案和Sean Chen对 https: //Backoverflow.com/a/36417177/132438 (都是BigQuery工程师)。



摘要是:


  • 当重新创建或截断表格时您需要等待为了避免数据丢失,在流式传输前2分钟。
  • / p>

    We are using BigQuery Python API to run some analyzes. To do so, we created the following adapter:

    def stream_data(self, table, data, schema, how=None):
        r = self.connector.tables().list(projectId=self._project_id,
                                         datasetId='lbanor').execute()
        table_exists = [row['tableReference']['tableId'] for row in
                        r['tables'] if
                        row['tableReference']['tableId'] == table]
        if table_exists:
            if how == 'WRITE_TRUNCATE':
                self.connector.tables().delete(projectId=self._project_id,
                                               datasetId='lbanor',
                                               tableId=table).execute()
                body = {
                    'tableReference': {
                        'tableId': table,
                        'projectId': self._project_id,
                        'datasetId': 'lbanor'
                    },
                    'schema': schema
                }
                self.connector.tables().insert(projectId=(
                                               self._project_id),
                                               datasetId='lbanor',
                                               body=body).execute()
        else:
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': self._project_id,
                    'datasetId': 'lbanor'
                },
                'schema': schema
            }
            self.connector.tables().insert(projectId=(
                                           self._project_id),
                                           datasetId='lbanor',
                                           body=body).execute()
    
        body = {
            'rows': [
                {
                    'json': data,
                    'insertId': str(uuid.uuid4())
                }
            ]
        }
        self.connector.tabledata().insertAll(projectId=(
                                             self._project_id),
                                             datasetId='lbanor',
                                             tableId=table,
                                                   body=body).execute(num_retries=5)
    

    where connector is just the build object.

    Its main purpose is to stream data to the given table. And in case the table already exists and "how" input is passed as "WRITE_TRUNCATE", then the table is first deleted and created again. After that, proceed doing the data stream.

    Everything worked fine when the table is not deleted over and over again.

    For instance, this is the result when we run the script without simulating the write truncation option (a for loop keeps calling stream_data with how=None):

    [
      {
        "date": "2016-04-25",
        "unix_date": "1461606664981207",
        "init_cv_date": "2016-03-12",
        "end_cv_date": "2016-03-25",
        "days_trained": "56",
        "days_validated": "14",
        "navigated_score": "1",
        "carted_score": "3",
        "purchased_score": "10",
        "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
        "metric": "rank",
        "result": "0.31729249914663893"
      },
      {
        "date": "2016-04-25",
        "unix_date": "1461606599745107",
        "init_cv_date": "2016-03-06",
        "end_cv_date": "2016-03-25",
        "days_trained": "80",
        "days_validated": "20",
        "navigated_score": "1",
        "carted_score": "3",
        "purchased_score": "10",
        "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
        "metric": "rank",
        "result": "0.32677143128667446"
      },
      {
        "date": "2016-04-25",
        "unix_date": "1461606688950415",
        "init_cv_date": "2016-03-14",
        "end_cv_date": "2016-03-25",
        "days_trained": "48",
        "days_validated": "12",
        "navigated_score": "1",
        "carted_score": "3",
        "purchased_score": "10",
        "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
        "metric": "rank",
        "result": "0.3129267723358932"
      },
      {
        "date": "2016-04-25",
        "unix_date": "1461606707195122",
        "init_cv_date": "2016-03-16",
        "end_cv_date": "2016-03-25",
        "days_trained": "40",
        "days_validated": "10",
        "navigated_score": "1",
        "carted_score": "3",
        "purchased_score": "10",
        "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
        "metric": "rank",
        "result": "0.310620987663015"
      },
      {
        "date": "2016-04-25",
        "unix_date": "1461606622432947",
        "init_cv_date": "2016-03-08",
        "end_cv_date": "2016-03-25",
        "days_trained": "72",
        "days_validated": "18",
        "navigated_score": "1",
        "carted_score": "3",
        "purchased_score": "10",
        "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
        "metric": "rank",
        "result": "0.32395802949369296"
      }
    ]
    

    But when we use the same adapter with the input how="WRITE_TRUNCATE", its behavior changed and became unpredictable.

    Sometimes it works and data is saved to the table. But sometimes, even though no error is raised, no data is saved to the table.

    When trying to query the table, no data is returned. It just returns "Query returned zero results".

    Something went wrong when deleting the table, creating it again and streaming the data. Are we making some mistake?

    If you need more info please let me know. Thanks in advance!

    解决方案

    See Jordan Tigani answer and Sean Chen's comment to https://stackoverflow.com/a/36417177/132438 (both BigQuery engineers).

    Summary is:

    • When re-creating or truncating a table "You'll need to wait >2 minutes before streaming in order to avoid data being dropped.

    So that would explain why you are getting this non-deterministic behavior.

    这篇关于流式传输之前BigQuery表格截断不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆