Python BigQuery允许使用pandas.io.gbq来设置大容量结果 [英] Python BigQuery allowLargeResults with pandas.io.gbq

查看:184
本文介绍了Python BigQuery允许使用pandas.io.gbq来设置大容量结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用熊猫图书馆阅读BigQuery 数据。如何获得大量结果?

对于非Pandas BigQuery交互,可以像



当前使用Pandas的代码:

  sProjectID =project-id
sQuery ='''
SELECT
column1,column2 $ b $ FROM [dataset_name.tablename]
'''
from pandas.io import gbq
df = gbq.read_gbq(sQuery,sProjectID)


解决方案

决定通过 python3 google.cloud API发布适当的方式。看着我以前的回答,我发现它会像yosemite_k说的那样失败。

大型结果确实需要遵循BigQuery-> Storage-> local-> dataframe模式。

BigQuery资源:





存储资源: p>

>

熊猫资源:



安装:

 点子安装pandas 
pip安装google-cloud-storage
pip安装google-cloud-bigquery

完全实现( bigquery_to_dataframe.py ):

 
We需要python 3 for google云python API
mkvirtualenv --python`which python3` env3
我们的依赖关系:
pip install pandas
pip in google-cloud-bigquery
pip安装google-cloud-storage

导入os
导入时间
导入uuid

from google.cloud import b $ b $ from bb_to_df(project_id,dataset_id,table_id,storage_uri,local_data_path):
从BigQuery获取数据到本地熊猫数据框的管道。

:param project_id:我们正在使用的Google项目ID。
:类型project_id:str
:param dataset_id:BigQuery数据集标识。
:类型dataset_id:str
:param table_id:BigQuery表id。
:输入table_id:str
:param storage_uri:Google Storage uri数据丢失。
:输入storage_uri:str
:param local_data_path:数据最终结束的路径。
:输入local_data_path:str
:return:BigQuery表中的Pandas数据框。
:rtype:pd.DataFrame

bq_to_storage(project_id,dataset_id,table_id,storage_uri)

storage_to_local(project_id,storage_uri,local_data_path)

data_dir = os.path.join(local_data_path,test_data)
df = local_to_df(data_dir)

返回df


def bq_to_storage(project_id,dataset_id,table_id,target_uri):
将BigQuery表导出到Google Storage。

:param project_id:我们正在使用的Google项目ID。
:type project_id:str $ b $:param dataset_id:源数据所在的BigQuery数据集名称。
:类型dataset_id:str
:param table_id:源数据所在的BigQuery表名称。
:输入table_id:str
:param target_uri:保存表的Google存储位置。
:输入target_uri:str
:return:为识别作业而生成的随机ID。
:rtype:str

client = bigquery.Client(project = project_id)

dataset = client.dataset(dataset_name = dataset_id)
table = dataset.table(name = table_id)

job = client.extract_table_to_storage($ b $ str(uuid.uuid4()),#id分配给作业名称
表格,
target_uri

job.destination_format ='CSV'
job.write_disposition ='WRITE_TRUNCATE'

job.begin()#async执行

如果job.errors:
print(job.errors)

while job.state!='DONE':
time.sleep( 5)
print(exporting'{}。{}'to'{}':{}。format(
dataset_id,table_id,target_uri,job.state
))
job.reload()

print(job.state)

return job.name


def storage_to_local(project_id, source_uri,target_dir):
保存文件或文件夹谷歌存储到本地目录。

:param project_id:我们正在使用的Google项目ID。
:type project_id:str
:param source_uri:Google存储位置。
:输入source_uri:str
:param target_dir:要存储文件的本地文件位置。
:type target_dir:str
:return:None
:rtype:None

client = storage.Client(project = project_id)

bucket_name = source_uri.split(gs://)[1] .split(/)[0]
file_path =/\".join(source_uri.split(\"gs:/ /\")[1].split(\"/\")[1 ::])
bucket = client.lookup_bucket(bucket_name)

folder_name =/\".join(file_path.split (/)[0:-1])+/
blobs = [o for bucket.list_blobs()if o.name.startswith(folder_name)]

#获取文件,如果我们只想要文件
blob_name = file_path.split(/)[ - 1]
如果blob_name!=*:
print(只获取文件' {}'。format(file_path))
our_blobs = [o for blobs if o.name.endswith(blob_name)]
else:
print(获取所有文件 {}'。format(folder_name))
our_blobs = blob

print(our_blobs中的[o.name for o]]

forblob在our_blobs中:
filename = os.path.join(target_dir,blob.name)

#根据需要创建一个复杂的文件夹结构
如果不是os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))

打开(文件名,'wb')作为f:
blob.download_to_file(f)


def local_to_df(data_path):
将本地数据文件导入单个熊猫数据框。

:param data_path:csv数据所在的文件或文件夹路径。
:类型data_path:str
:return:包含来自data_path的数据的Pandas数据框。
:rtype:pd.DataFrame

#如果data_dir是一个文件,那么只要将它加载到pandas
中,如果os.path.isfile(data_path):
print(Loading'{}'into a dataframe.format(data_path))
df = pd.read_csv(data_path,header = 1)
elif os.path.isdir(data_path):
files = [os.path.join(data_path,fi)for os in os.listdir(data_path)]
print(将{}加载到一个数据框中.format(files))
df = pd.concat((pd.read_csv(s)for s in files))
else:
raise ValueError(
请输入一个有效的路径。{}不存在。。format(data_path)


return df


if __name__ =='__main__':
PROJECT_ID =my -project
DATASET_ID =bq_dataset
TABLE_ID =bq_table
STORAGE_URI =gs:// my-bucket / path / for / dropoff / *
LOCAL_DATA_PATH = / path / to / save /

bq_to_df(PROJECT_ID,DATASET_ID,TABL E_ID,STORAGE_URI,LOCAL_DATA_PATH)


I want to use the Pandas library to read BigQuery data. How do I allow large results?
For non-Pandas BigQuery interactions, this can be achieved like this.

Current code with Pandas:

sProjectID = "project-id"
sQuery = '''
    SELECT 
        column1, column2
    FROM [dataset_name.tablename]
'''
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)

解决方案

Decided to post the proper way to do this via the python3 google.cloud API. Looking at my previous answer I see that it would fail like yosemite_k said.

Large results really need to follow BigQuery -> Storage -> local -> dataframe pattern.

BigQuery resources:

Storage resources:

Pandas Resources:

Installation:

pip install pandas
pip install google-cloud-storage
pip install google-cloud-bigquery

Full implementation (bigquery_to_dataframe.py):

"""
We require python 3 for the google cloud python API
    mkvirtualenv --python `which python3` env3
And our dependencies:
    pip install pandas
    pip install google-cloud-bigquery
    pip install google-cloud-storage
"""
import os
import time
import uuid

from google.cloud import bigquery
from google.cloud import storage
import pandas as pd


def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path):
    """Pipeline to get data from BigQuery into a local pandas dataframe.

    :param project_id: Google project ID we are working in.
    :type project_id: str
    :param dataset_id: BigQuery dataset id.
    :type dataset_id: str
    :param table_id: BigQuery table id.
    :type table_id: str
    :param storage_uri: Google Storage uri where data gets dropped off.
    :type storage_uri: str
    :param local_data_path: Path where data should end up.
    :type local_data_path: str
    :return: Pandas dataframe from BigQuery table.
    :rtype: pd.DataFrame
    """
    bq_to_storage(project_id, dataset_id, table_id, storage_uri)

    storage_to_local(project_id, storage_uri, local_data_path)

    data_dir = os.path.join(local_data_path, "test_data")
    df = local_to_df(data_dir)

    return df


def bq_to_storage(project_id, dataset_id, table_id, target_uri):
    """Export a BigQuery table to Google Storage.

    :param project_id: Google project ID we are working in.
    :type project_id: str
    :param dataset_id: BigQuery dataset name where source data resides.
    :type dataset_id: str
    :param table_id: BigQuery table name where source data resides.
    :type table_id: str
    :param target_uri: Google Storage location where table gets saved.
    :type target_uri: str
    :return: The random ID generated to identify the job.
    :rtype: str
    """
    client = bigquery.Client(project=project_id)

    dataset = client.dataset(dataset_name=dataset_id)
    table = dataset.table(name=table_id)

    job = client.extract_table_to_storage(
        str(uuid.uuid4()),  # id we assign to be the job name
        table,
        target_uri
    )
    job.destination_format = 'CSV'
    job.write_disposition = 'WRITE_TRUNCATE'

    job.begin()  # async execution

    if job.errors:
        print(job.errors)

    while job.state != 'DONE':
        time.sleep(5)
        print("exporting '{}.{}' to '{}':  {}".format(
            dataset_id, table_id, target_uri, job.state
        ))
        job.reload()

    print(job.state)

    return job.name


def storage_to_local(project_id, source_uri, target_dir):
    """Save a file or folder from google storage to a local directory.

    :param project_id: Google project ID we are working in.
    :type project_id: str
    :param source_uri: Google Storage location where file comes form.
    :type source_uri: str
    :param target_dir: Local file location where files are to be stored.
    :type target_dir: str
    :return: None
    :rtype: None
    """
    client = storage.Client(project=project_id)

    bucket_name = source_uri.split("gs://")[1].split("/")[0]
    file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::])
    bucket = client.lookup_bucket(bucket_name)

    folder_name = "/".join(file_path.split("/")[0:-1]) + "/"
    blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)]

    # get files if we wanted just files
    blob_name = file_path.split("/")[-1]
    if blob_name != "*":
        print("Getting just the file '{}'".format(file_path))
        our_blobs = [o for o in blobs if o.name.endswith(blob_name)]
    else:
        print("Getting all files in '{}'".format(folder_name))
        our_blobs = blobs

    print([o.name for o in our_blobs])

    for blob in our_blobs:
        filename = os.path.join(target_dir, blob.name)

        # create a complex folder structure if necessary
        if not os.path.isdir(os.path.dirname(filename)):
            os.makedirs(os.path.dirname(filename))

        with open(filename, 'wb') as f:
            blob.download_to_file(f)


def local_to_df(data_path):
    """Import local data files into a single pandas dataframe.

    :param data_path: File or folder path where csv data are located.
    :type data_path: str
    :return: Pandas dataframe containing data from data_path.
    :rtype: pd.DataFrame
    """
    # if data_dir is a file, then just load it into pandas
    if os.path.isfile(data_path):
        print("Loading '{}' into a dataframe".format(data_path))
        df = pd.read_csv(data_path, header=1)
    elif os.path.isdir(data_path):
        files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)]
        print("Loading {} into a single dataframe".format(files))
        df = pd.concat((pd.read_csv(s) for s in files))
    else:
        raise ValueError(
            "Please enter a valid path.  {} does not exist.".format(data_path)
        )

    return df


if __name__ == '__main__':
    PROJECT_ID = "my-project"
    DATASET_ID = "bq_dataset"
    TABLE_ID = "bq_table"
    STORAGE_URI = "gs://my-bucket/path/for/dropoff/*"
    LOCAL_DATA_PATH = "/path/to/save/"

    bq_to_df(PROJECT_ID, DATASET_ID, TABLE_ID, STORAGE_URI, LOCAL_DATA_PATH)

这篇关于Python BigQuery允许使用pandas.io.gbq来设置大容量结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆