流式传输前的 BigQuery 表截断不起作用 [英] BigQuery table truncation before streaming not working

查看:19
本文介绍了流式传输前的 BigQuery 表截断不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用 BigQuery Python API 来运行一些分析.为此,我们创建了以下适配器:

def stream_data(self, table, data, schema, how=None):r = self.connector.tables().list(projectId=self._project_id,datasetId='lbanor').execute()table_exists = [row['tableReference']['tableId'] 用于行中r['tables'] 如果row['tableReference']['tableId'] == table]如果 table_exists:如果如何 == 'WRITE_TRUNCATE':self.connector.tables().delete(projectId=self._project_id,datasetId='lbanor',tableId=table).execute()身体 = {'表参考':{'tableId':表,'projectId': self._project_id,'数据集Id':'lbanor'},模式":模式}self.connector.tables().insert(projectId=(self._project_id),datasetId='lbanor',body=body).execute()别的:身体 = {'表参考':{'tableId':表,'projectId': self._project_id,'数据集Id':'lbanor'},模式":模式}self.connector.tables().insert(projectId=(self._project_id),datasetId='lbanor',body=body).execute()身体 = {行":[{'json':数据,'insertId': str(uuid.uuid4())}]}self.connector.tabledata().insertAll(projectId=(self._project_id),datasetId='lbanor',表 ID = 表,body=body).execute(num_retries=5)

其中 connector 只是构建对象.

它的主要目的是将数据流式传输到给定的表.如果表已经存在并且如何"输入作为WRITE_TRUNCATE"传递,则首先删除并再次创建表.之后,继续做数据流.

当表没有被反复删除时,一切正常.

例如,这是我们在不模拟写截断选项的情况下运行脚本时的结果(for 循环不断调用 stream_datahow=None):

<预><代码>[{"日期": "2016-04-25","unix_date": "1461606664981207","init_cv_date": "2016-03-12","end_cv_date": "2016-03-25","days_trained": "56","days_validated": "14","navigated_score": "1","carted_score": "3","purchased_score": "10","description": "第一次试用最畅销的算法.没有过滤器,也没有应用任何条件.Skus 不存在于火车计数中,排名=0.5","metric": "排名",结果":0.31729249914663893"},{"日期": "2016-04-25","unix_date": "1461606599745107","init_cv_date": "2016-03-06","end_cv_date": "2016-03-25","days_trained": "80","days_validated": "20","navigated_score": "1","carted_score": "3","purchased_score": "10","description": "第一次试用最畅销的算法.没有过滤器,也没有应用任何条件.Skus 不存在于火车计数中,排名=0.5","metric": "排名",结果":0.32677143128667446"},{"日期": "2016-04-25","unix_date": "1461606688950415","init_cv_date": "2016-03-14","end_cv_date": "2016-03-25","days_trained": "48","days_validated": "12","navigated_score": "1","carted_score": "3","purchased_score": "10","description": "第一次试用最畅销的算法.没有过滤器,也没有应用任何条件.Skus 不存在于火车计数中,排名=0.5","metric": "排名",结果":0.3129267723358932"},{"日期": "2016-04-25","unix_date": "1461606707195122","init_cv_date": "2016-03-16","end_cv_date": "2016-03-25","days_trained": "40","days_validated": "10","navigated_score": "1","carted_score": "3","purchased_score": "10","description": "第一次试用最畅销的算法.没有过滤器,也没有应用任何条件.Skus 不存在于火车计数中,排名=0.5","metric": "排名",结果":0.310620987663015"},{"日期": "2016-04-25","unix_date": "1461606622432947","init_cv_date": "2016-03-08","end_cv_date": "2016-03-25","days_trained": "72","days_validated": "18","navigated_score": "1","carted_score": "3","purchased_score": "10","description": "第一次试用最畅销的算法.没有过滤器,也没有应用任何条件.Skus 不存在于火车计数中,排名=0.5","metric": "排名",结果":0.32395802949369296"}]

但是当我们将相同的适配器与输入 how="WRITE_TRUNCATE" 一起使用时,它的行为发生了变化并且变得不可预测.

有时它会起作用并且数据会保存到表中.但有时,即使没有出现错误,也没有数据保存到表中.

尝试查询表时,没有返回任何数据.它只是返回查询返回零结果".

删除表、重新创建表并流式传输数据时出现问题.我们是不是犯了一些错误?

如果您需要更多信息,请告诉我.提前致谢!

解决方案

参见 Jordan Tigani 的回答和 Sean Chen 对 https://https://stackoverflow.com/a/36417177/132438>的评论stackoverflow.com/a/36417177/132438(均为 BigQuery 工程师).

总结是:

  • 重新创建或截断表时您需要等待 2 分钟以上才能流式传输,以避免数据丢失.

这样就可以解释为什么您会出现这种非确定性行为.

We are using BigQuery Python API to run some analyzes. To do so, we created the following adapter:

def stream_data(self, table, data, schema, how=None):
    r = self.connector.tables().list(projectId=self._project_id,
                                     datasetId='lbanor').execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if table_exists:
        if how == 'WRITE_TRUNCATE':
            self.connector.tables().delete(projectId=self._project_id,
                                           datasetId='lbanor',
                                           tableId=table).execute()
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': self._project_id,
                    'datasetId': 'lbanor'
                },
                'schema': schema
            }
            self.connector.tables().insert(projectId=(
                                           self._project_id),
                                           datasetId='lbanor',
                                           body=body).execute()
    else:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': self._project_id,
                'datasetId': 'lbanor'
            },
            'schema': schema
        }
        self.connector.tables().insert(projectId=(
                                       self._project_id),
                                       datasetId='lbanor',
                                       body=body).execute()

    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.connector.tabledata().insertAll(projectId=(
                                         self._project_id),
                                         datasetId='lbanor',
                                         tableId=table,
                                               body=body).execute(num_retries=5)

where connector is just the build object.

Its main purpose is to stream data to the given table. And in case the table already exists and "how" input is passed as "WRITE_TRUNCATE", then the table is first deleted and created again. After that, proceed doing the data stream.

Everything worked fine when the table is not deleted over and over again.

For instance, this is the result when we run the script without simulating the write truncation option (a for loop keeps calling stream_data with how=None):

[
  {
    "date": "2016-04-25",
    "unix_date": "1461606664981207",
    "init_cv_date": "2016-03-12",
    "end_cv_date": "2016-03-25",
    "days_trained": "56",
    "days_validated": "14",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.31729249914663893"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606599745107",
    "init_cv_date": "2016-03-06",
    "end_cv_date": "2016-03-25",
    "days_trained": "80",
    "days_validated": "20",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32677143128667446"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606688950415",
    "init_cv_date": "2016-03-14",
    "end_cv_date": "2016-03-25",
    "days_trained": "48",
    "days_validated": "12",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.3129267723358932"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606707195122",
    "init_cv_date": "2016-03-16",
    "end_cv_date": "2016-03-25",
    "days_trained": "40",
    "days_validated": "10",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.310620987663015"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606622432947",
    "init_cv_date": "2016-03-08",
    "end_cv_date": "2016-03-25",
    "days_trained": "72",
    "days_validated": "18",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32395802949369296"
  }
]

But when we use the same adapter with the input how="WRITE_TRUNCATE", its behavior changed and became unpredictable.

Sometimes it works and data is saved to the table. But sometimes, even though no error is raised, no data is saved to the table.

When trying to query the table, no data is returned. It just returns "Query returned zero results".

Something went wrong when deleting the table, creating it again and streaming the data. Are we making some mistake?

If you need more info please let me know. Thanks in advance!

解决方案

See Jordan Tigani answer and Sean Chen's comment to https://stackoverflow.com/a/36417177/132438 (both BigQuery engineers).

Summary is:

  • When re-creating or truncating a table "You'll need to wait >2 minutes before streaming in order to avoid data being dropped.

So that would explain why you are getting this non-deterministic behavior.

这篇关于流式传输前的 BigQuery 表截断不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆