Python从GCS并行将.json文件读取到 pandas DF中 [英] Python read .json files from GCS into pandas DF in parallel
问题描述
TL; DR: asyncio
vs multi-processing
vs threading
vs. 其他解决方案
并行化for循环,该循环从GCS读取文件,然后将这些数据一起添加到pandas数据框中,然后写入BigQuery ...
TL;DR: asyncio
vs multi-processing
vs threading
vs. some other solution
to parallelize for loop that reads files from GCS, then appends this data together into a pandas dataframe, then writes to BigQuery...
我想并行创建一个python函数,该函数从GCS目录中读取成千上万个小的 .json 文件,然后将这些 .jsons 转换为熊猫数据帧,然后将熊猫数据帧写入BigQuery表.
I'd like to make parallel a python function that reads hundreds of thousands of small .json files from a GCS directory, then converts those .jsons into pandas dataframes, and then writes the pandas dataframes to a BigQuery table.
以下是该函数的非并行版本:
Here is a not-parallel version of the function:
import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) #
# Create new table
output_df = pd.DataFrame()
fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
counter = 0
for file in files:
# read files from GCS
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
output_df = output_df.append(this_df)
# Write to BigQuery for every 5K rows of data
counter += 1
if (counter % 5000 == 0):
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
output_df = pd.DataFrame() # and reset the dataframe
# Write remaining rows to BigQuery
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
此功能非常简单:
- grab
['gcs_dir/file1.json','gcs_dir/file2.json',...]
,GCS中的文件名列表 - 遍历每个文件名,并:
- 从GCS读取文件
- 将数据转换为熊猫DF
- 附加到主熊猫DF
- 每5K循环一次,写入BigQuery(因为随着DF变大,追加会变得很慢)
- grab
['gcs_dir/file1.json', 'gcs_dir/file2.json', ...]
, the list of file names in GCS - loop over each file name, and:
- read the file from GCS
- converts the data into a pandas DF
- appends to a main pandas DF
- every 5K loops, write to BigQuery (since the appends get much slower as the DF gets larger)
我必须在几个GCS目录中运行此功能,每个目录都有〜500K个文件.由于读取/写入许多小文件的瓶颈,单个目录将花费约24小时的时间...如果我可以更进一步地提高速度,这将是一个很好的选择,因为这似乎是一项任务有助于并行化.
I have to run this function on a few GCS directories each with ~500K files. Due to the bottleneck of reading/writing this many small files, this process will take ~24 hours for a single directory... It would be great if I could make this more parallel to speed things up, as it seems like a task that lends itself to parallelization.
编辑:以下解决方案很有帮助,但我特别希望从python脚本中并行运行.熊猫正在处理一些数据清理,并且使用
bq load
会引发错误.有 asyncio 和此Edit: The solutions below are helpful, but I am particularly interested in running in parallel from within the python script. Pandas is handling some data cleaning, and using
bq load
will throw errors. There is asyncio and this gcloud-aio-storage that both seem potentially useful for this task, maybe as better options than threading or multiprocessing...推荐答案
您可以考虑并行调用python程序,而不是在python代码中添加并行处理.这是一个技巧,可以更轻松地将其自身应用到在命令行中获取文件列表的程序.因此,出于这篇文章的考虑,让我们考虑在程序中更改一行:
Rather than add parallel processing to your python code, consider invoking your python program multiple times in parallel. This is a trick that lends itself more easily to a program that takes a list of files on the command line. So, for the sake of this post, let's consider changing one line in your program:
您的行:
# my own function to get list of filenames from GCS directory files = get_gcs_file_list(directory=gcs_directory) #
新行:
files = sys.argv[1:] # ok, import sys, too
现在,您可以通过以下方式调用程序:
Now, you can invoke your program this way:
PROCESSES=100 get_gcs_file_list.py | xargs -P $PROCESSES your_program
xargs
现在将采用get_gcs_file_list.py
输出的文件名,并并行调用your_program
多达100次,可容纳尽可能多的文件在每一行上尽可能地命名.我相信文件名的数量限于外壳程序允许的最大命令大小.如果100个进程不足以处理您的所有文件,则xargs将再次(并再次)调用your_program
,直到处理完从stdin读取的所有文件名为止.xargs
确保同时运行不超过100个your_program
的调用.您可以根据主机可用的资源来更改进程数.xargs
will now take the file names output byget_gcs_file_list.py
and invokeyour_program
up to 100 times in parallel, fitting as many file names as it can on each line. I believe the number of file names is limited to the maximum command size allowed by the shell. If 100 processes is not enough to process all your files, xargs will invokeyour_program
again (and again) until all file names it reads from stdin are processed.xargs
ensures that no more than 100 invocations ofyour_program
are running simultaneously. You can vary the number of processes based on the resources available to your host.这篇关于Python从GCS并行将.json文件读取到 pandas DF中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!