如何最好地处理Google BigQuery中存储在不同位置的数据? [英] How to best handle data stored in different locations in Google BigQuery?

查看:190
本文介绍了如何最好地处理Google BigQuery中存储在不同位置的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(1)查询公共存储库(存储在美国)中的数据,(2)将其写入到(3)将csv导出到云存储桶中;(4)将csv下载到我工作的服务器上;(5)在服务器上处理csv。

现在我遇到的问题是,我工作的服务器位于欧盟。因此,我必须支付相当多的费用才能在我的美国存储桶和我的EU服务器之间传输数据。我现在可以继续在欧盟找到我的存储桶,但是我仍然存在将数据从美国(BigQuery)转移到欧盟(存储桶)的问题。因此,我也可以将我的数据集设置为bq位于欧盟,但是由于公共存储库中的数据位于美国,因此不能再进行任何查询,并且不允许在不同位置之间进行查询。



有没有人知道如何解决这个问题? 要将BigQuery数据集从一个区域复制到另一个区域,需要利用存储数据传输服务。它没有解决您仍然必须支付存储桶到存储桶的事实网络流量,但可能会节省一些将数据复制到欧盟服务器的CPU时间。



流量将为:


  1. 将所有BigQuery表提取到与表相同的区域中的存储桶中。 (推荐使用Avro格式以获得最佳数据类型保真度和最快加载速度。)
  2. 运行存储传输作业​​以将提取的文件从起始位置存储桶复制到目标位置的存储桶。

  3. 将所有文件加载到位于目标位置的BigQuery数据集中。

Python例子:

 #Copyright 2018 Google LLC 

# Apache许可证2.0版(许可证);
#除遵守许可证外,您不得使用此文件。
#您可以在

#https://www.apache.org/licenses/LICENSE-2.0

#中获得许可证的副本根据适用法律要求或书面同意,根据许可分发的软件
#以原样基础,
#分发,不附有任何形式的明示或暗示保证或条件。
#请参阅许可证以了解许可证下特定语言的管理权限和
#限制。

导入日期时间
导入sys
导入时间

从google.cloud导入googleapiclient.discovery
导入bigquery
导入json
导入pytz


PROJECT_ID ='swast-scratch'#TODO:将其设置为您的项目名称
FROM_LOCATION ='US'#TODO:将其设置为BigQuery位置
FROM_DATASET ='workflow_test_us'#TODO:设置为BQ数据集名称
FROM_BUCKET ='swast-scratch-us'#TODO:设置为相同位置的存储桶名称
TO_LOCATION ='EU '#TODO:将其设置为目标BigQuery位置
TO_DATASET ='workflow_test_eu'#TODO:设置为目标数据集名称
TO_BUCKET ='swast-scratch-eu'#TODO:设置为目标中的存储桶名称loc

#构建API客户端。
bq_client = bigquery.Client(project = PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer','v1')


def extract_tables() :
#将数据集中的所有表提取到云存储分区。
print('Extracting {}:{} to bucket {}'。格式(
PROJECT_ID,FROM_DATASET,FROM_BUCKET))

tables = list(bq_client.list_tables(bq_client。对于表中的表:
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.AVRO
extract_job = bq_client.extract_table(
table.reference,
['gs:// {} / {} .avro'.format(FROM_BUCKET,table.table_id)],
location = FROM_LOCATION,#在0.32.0库中可用。
job_config = job_config)#启动提取作业。
extract_jobs.append(extract_job)

在extract_jobs中的工作:
job.result()

返回表


def transfer_buckets():
#使用存储传输服务将文件从一个区域传输到另一个区域。
print('将桶转移到{}'。格式(FROM_BUCKET,TO_BUCKET))
now = datetime.datetime.now(pytz.utc)
transfer_job = {
'description':'{} - {} - {} _ once'.format(
PROJECT_ID,FROM_BUCKET,TO_BUCKET),
'status':'ENABLED',
'projectId':PROJECT_ID ,
'transferSpec':{
'transferOptions':{
'overwriteObjectsAlreadyExistingInSink':True,
},
'gcsDataSource':{
'bucketName ':FROM_BUCKET,
},
'gcsDataSink':{
'bucketName':TO_BUCKET,
},
},
#设置开始和结束日期至今天(UTC),无时间部分立即开始
#工作。
'schedule':{
'scheduleStartDate':{
'year':now.year,
'month':now.month,
'day': now.day,
},
'scheduleEndDate':{
'year':now.year,
'month':now.month,
'day' :now.day,
},
},
}
transfer_job = transfer_client.transferJobs()。create(
body = transfer_job).execute()
print('Returned transferJob:{}'。format(
json.dumps(transfer_job,indent = 4)))

#查找为作业创建的操作。
job_filter = {
'project_id':PROJECT_ID,
'job_names':[transfer_job ['name']],
}

#等到该操作已经开始。
response = {}
while('operations'not in response)或(not response ['operations']):
time.sleep(1)
response = transfer_client。 transferOperations()。list(
name ='transferOperations',filter = json.dumps(job_filter))。execute()

operation = response ['operations'] [0]
print('Returned transferOperation:{}'。format(
json.dumps(operation,indent = 4)))

#等待传输完成。
print('Waiting',end ='')
while operation ['metadata'] ['status'] =='IN_PROGRESS':
print('。',end =' ')
sys.stdout.flush()
time.sleep(5)
operation = transfer_client.transferOperations()。get(
name = operation ['name']) .exe()
print()
$ b $ print('Finished transferOperation:{}'。format(
json.dumps(operation,indent = 4)))


def load_tables(表):
#将所有表加载到新数据集中。
print('从桶{}加载表格到{}:{}'。格式(
TO_BUCKET,PROJECT_ID,TO_DATASET))

load_jobs = []
对于表中的表:
dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.AVRO
load_job = bq_client.load_table_from_uri(
['gs:// {} / {} .avro'.format(TO_BUCKET,table.table_id)],
dest_table,
location = TO_LATION ,#可用于0.32.0库
job_config = job_config)#启动加载作业。
load_jobs.append(load_job)

用于load_jobs中的作业:
job.result()


#实际上运行脚本。
tables = extract_tables()
transfer_buckets()
load_tables(tables)

前面的示例使用Google-cloud-bigquery库来存储BigQuery API,使用google-api-python-client存储数据传输API。

请注意,不考虑分区表。


My current workflow in BigQuery is as follows:

(1) query data in a public repository (stored in the US), (2) write it to a table in my repository, (3) export a csv to a cloud bucket and (4) download the csv on the server I work on and (5) work with that on the server.

The problem I have now, is that the server I work on is located in EU. Thus, I have to pay quite some fees for transfering data between my US bucket and my EU server. I could now go ahead and locate my bucket in EU, but then I still have the problem that I would transfer data from the US (BigQuery) to EU (bucket). So I could also set my dataset in bq to be located in the EU, but then I cant do any queries anylonger, because the data in the public repository is located in the US, and queries between different locations are not allowed.

Does anyone have an idea of how to approach this?

解决方案

One way to copy a BigQuery dataset from one region to another is to take advantage of the Storage Data Transfer Service. It doesn't get around the fact that you still have to pay for bucket-to-bucket network traffic, but might save you some CPU time on copying data to a server in the EU.

The flow would be to:

  1. Extract all the BigQuery tables into a bucket in the same region as the tables. (Recommend Avro format for best fidelity in data types and fastest loading speed.)
  2. Run a storage transfer job to copy the extracted files from the starting location bucket to a bucket in the destination location.
  3. Load all the files into a BigQuery dataset located in the destination location.

Python example:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import sys
import time

import googleapiclient.discovery
from google.cloud import bigquery
import json
import pytz


PROJECT_ID = 'swast-scratch'  # TODO: set this to your project name
FROM_LOCATION = 'US'  # TODO: set this to the BigQuery location
FROM_DATASET = 'workflow_test_us'  # TODO: set to BQ dataset name
FROM_BUCKET = 'swast-scratch-us'  # TODO: set to bucket name in same location
TO_LOCATION = 'EU'  # TODO: set this to the destination BigQuery location
TO_DATASET = 'workflow_test_eu'  # TODO: set to destination dataset name
TO_BUCKET = 'swast-scratch-eu'  # TODO: set to bucket name in destination loc

# Construct API clients.
bq_client = bigquery.Client(project=PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer', 'v1')


def extract_tables():
    # Extract all tables in a dataset to a Cloud Storage bucket.
    print('Extracting {}:{} to bucket {}'.format(
        PROJECT_ID, FROM_DATASET, FROM_BUCKET))

    tables = list(bq_client.list_tables(bq_client.dataset(FROM_DATASET)))
    extract_jobs = []
    for table in tables:
        job_config = bigquery.ExtractJobConfig()
        job_config.destination_format = bigquery.DestinationFormat.AVRO
        extract_job = bq_client.extract_table(
            table.reference,
            ['gs://{}/{}.avro'.format(FROM_BUCKET, table.table_id)],
            location=FROM_LOCATION,  # Available in 0.32.0 library.
            job_config=job_config)  # Starts the extract job.
        extract_jobs.append(extract_job)

    for job in extract_jobs:
        job.result()

    return tables


def transfer_buckets():
    # Transfer files from one region to another using storage transfer service.
    print('Transferring bucket {} to {}'.format(FROM_BUCKET, TO_BUCKET))
    now = datetime.datetime.now(pytz.utc)
    transfer_job = {
        'description': '{}-{}-{}_once'.format(
            PROJECT_ID, FROM_BUCKET, TO_BUCKET),
        'status': 'ENABLED',
        'projectId': PROJECT_ID,
        'transferSpec': {
            'transferOptions': {
                'overwriteObjectsAlreadyExistingInSink': True,
            },
            'gcsDataSource': {
                'bucketName': FROM_BUCKET,
            },
            'gcsDataSink': {
                'bucketName': TO_BUCKET,
            },
        },
        # Set start and end date to today (UTC) without a time part to start
        # the job immediately.
        'schedule': {
            'scheduleStartDate': {
                'year': now.year,
                'month': now.month,
                'day': now.day,
            },
            'scheduleEndDate': {
                'year': now.year,
                'month': now.month,
                'day': now.day,
            },
        },
    }
    transfer_job = transfer_client.transferJobs().create(
        body=transfer_job).execute()
    print('Returned transferJob: {}'.format(
        json.dumps(transfer_job, indent=4)))

    # Find the operation created for the job.
    job_filter = {
        'project_id': PROJECT_ID,
        'job_names': [transfer_job['name']],
    }

    # Wait until the operation has started.
    response = {}
    while ('operations' not in response) or (not response['operations']):
        time.sleep(1)
        response = transfer_client.transferOperations().list(
            name='transferOperations', filter=json.dumps(job_filter)).execute()

    operation = response['operations'][0]
    print('Returned transferOperation: {}'.format(
        json.dumps(operation, indent=4)))

    # Wait for the transfer to complete.
    print('Waiting ', end='')
    while operation['metadata']['status'] == 'IN_PROGRESS':
        print('.', end='')
        sys.stdout.flush()
        time.sleep(5)
        operation = transfer_client.transferOperations().get(
            name=operation['name']).execute()
    print()

    print('Finished transferOperation: {}'.format(
        json.dumps(operation, indent=4)))


def load_tables(tables):
    # Load all tables into the new dataset.
    print('Loading tables from bucket {} to {}:{}'.format(
        TO_BUCKET, PROJECT_ID, TO_DATASET))

    load_jobs = []
    for table in tables:
        dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.AVRO
        load_job = bq_client.load_table_from_uri(
            ['gs://{}/{}.avro'.format(TO_BUCKET, table.table_id)],
            dest_table,
            location=TO_LOCATION,  # Available in 0.32.0 library.
            job_config=job_config)  # Starts the load job.
        load_jobs.append(load_job)

    for job in load_jobs:
        job.result()


# Actually run the script.
tables = extract_tables()
transfer_buckets()
load_tables(tables)

The preceding sample uses google-cloud-bigquery library for BigQuery API and google-api-python-client for Storage Data Transfer API.

Note that this sample does not account for partitioned tables.

这篇关于如何最好地处理Google BigQuery中存储在不同位置的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆