Python从GCS并行将.json文件读取到 pandas DF中 [英] Python read .json files from GCS into pandas DF in parallel

查看:68
本文介绍了Python从GCS并行将.json文件读取到 pandas DF中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TL; DR: asyncio vs multi-processing vs threading vs. 其他解决方案并行化for循环,该循环从GCS读取文件,然后将这些数据一起添加到pandas数据框中,然后写入BigQuery ...

TL;DR: asyncio vs multi-processing vs threading vs. some other solution to parallelize for loop that reads files from GCS, then appends this data together into a pandas dataframe, then writes to BigQuery...

我想并行创建一个python函数,该函数从GCS目录中读取成千上万个小的 .json 文件,然后将这些 .jsons 转换为熊猫数据帧,然后将熊猫数据帧写入BigQuery表.

I'd like to make parallel a python function that reads hundreds of thousands of small .json files from a GCS directory, then converts those .jsons into pandas dataframes, and then writes the pandas dataframes to a BigQuery table.

以下是该函数的非并行版本:

Here is a not-parallel version of the function:

import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):

    # my own function to get list of filenames from GCS directory
    files = get_gcs_file_list(directory=gcs_directory) # 

    # Create new table
    output_df = pd.DataFrame()
    fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
    counter = 0
    for file in files:

        # read files from GCS
        with fs.open(file, 'r') as f:
            gcs_data = json.loads(f.read())
            data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
            this_df = pd.DataFrame(data)
            output_df = output_df.append(this_df)

        # Write to BigQuery for every 5K rows of data
        counter += 1
        if (counter % 5000 == 0):
            pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
            output_df = pd.DataFrame() # and reset the dataframe


    # Write remaining rows to BigQuery
    pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')

此功能非常简单:

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆