Google Storage python API并行下载 [英] Google Storage python api download in parallel

查看:13
本文介绍了Google Storage python API并行下载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

通过添加-m标志-m,使用gsutil将大量文件并行下载到本地计算机非常简单:

gsutil -m cp gs://my-bucket/blob_prefix* .

在python中,我一次只能下载一个文件:

client = storage.Client()
bucket = client.get_bucket(gs_bucket_name)
blobs = [blob for blob in bucket.list_blobs(prefix=blob_prefix)]
for blob in blobs:
    blob.download_to_filename(filename)
最好将数据直接下载到内存中(类似于blob.download_as_string()),最好下载到生成器中。顺序并不重要。

此功能是否存在于python API中的某个位置?
如果没有,执行此操作的最佳方式是什么?

编辑

我已实现此攻击:

def fetch_data_from_storage(fetch_pattern):
    """Download blobs to local first, then load them into Generator."""
    tmp_save_dir = os.path.join("/tmp", "tmp_gs_download")
    if os.path.isdir(tmp_save_dir):
        shutil.rmtree(tmp_save_dir)  # empty tmp dir first
    os.makedirs(tmp_save_dir)  # create tmp dir

    download_command = ["gsutil", "-m", "cp", "gs://{}/{}".format(bucket.name, fetch_pattern), tmp_save_dir]
    resp = subprocess.call(download_command)

    for file in os.listdir(tmp_save_dir):
        with open(os.path.join(tmp_save_dir, file), 'r') as f_data:
            content = json.load(f_data)
            yield content

请告知是否在某个地方以更好的方式实现了这一点。

推荐答案

好的,这是我的多进程、多线程解决方案。它的工作原理如下:

  • 1)使用子进程和gsutil ls -l pattern获取blob名称及其文件大小的列表。这是__main__
  • 中输入的模式
  • 2)根据最大批次大小创建斑点名称批次。默认值为1MB。然后,大文件将只创建一批%1。
  • 3)每批发送到不同的进程。默认进程=cpu_count - 2
  • 4)每个进程中的每一批都是多线程的(默认最大线程数=10)。需要先完成一批线程,然后才能开始下一批。
  • 5)每个线程下载一个BLOB并将其与其元数据组合。
  • 6)结果通过共享资源和内存分配向上传播。

我写这篇文章的原因:

  • 我还需要元数据,它在gsutil(关键)中丢失了
  • 如果某些部件失败(次要),我想要一些重试控制

~2500个小(<;50KB)(=55MB)文件的速度比较:

  • 一次一个文件(含元数据):25m13s
  • gsutil -m cp(无元数据):0m35s
  • 以下代码(含元数据):1m43s

from typing import Iterable, Generator
import logging
import json
import datetime
import re
import subprocess
import multiprocessing
import threading

from google.cloud import storage

logging.basicConfig(level='INFO')
logger = logging.getLogger(__name__)


class StorageDownloader:

    def __init__(self, bucket_name):
        self.bucket_name = bucket_name
        self.bucket = storage.Client().bucket(bucket_name)

    def create_blob_batches_by_pattern(self, fetch_pattern, max_batch_size=1e6):
        """Fetch all blob names according to the pattern and the blob size.

        :param fetch_pattern: The gsutil matching pattern for files we want to download.
          A gsutil pattern is used instead of blob prefix because it is more powerful.
        :type fetch_pattern: str
        :param max_batch_size: Maximum size per batch in bytes.  Default = 1 MB = 1e6 bytes
        :type max_batch_size: float or int
        :return: Generator of batches of blob names.
        :rtype: Generator of list
        """
        download_command = ["gsutil", "ls", "-l", "gs://{}/{}".format(self.bucket.name, fetch_pattern)]
        logger.info("Gsutil list command command: {}".format(download_command))
        blob_details_raw = subprocess.check_output(download_command).decode()
        regexp = r"(d+) +dddd-dd-ddTdd:dd:ddZ +gs://S+?/(S+)"
        # re.finditer returns a generator so we don't duplicate memory need in case the string is quite large
        cum_batch_size = 0
        batch = []
        batch_nr = 1
        for reg_match in re.finditer(regexp, blob_details_raw):
            blob_name = reg_match.group(2)
            byte_size = int(reg_match.group(1))

            batch.append(blob_name)
            cum_batch_size += byte_size

            if cum_batch_size > max_batch_size:
                yield batch
                batch = []
                cum_batch_size = 0
                batch_nr += 1

        logger.info("Created {} batches with roughly max batch size = {} bytes".format(batch_nr, int(max_batch_size)))
        if batch:
            yield batch  # if we still have a batch left, then it must also be yielded

    @staticmethod
    def download_batch_into_memory(batch, bucket, inclue_metadata=True, max_threads=10):
        """Given a batch of storage filenames, download them into memory.

        Downloading the files in a batch is multithreaded.

        :param batch: A list of gs:// filenames to download.
        :type batch: list of str
        :param bucket: The google api pucket.
        :type bucket: google.cloud.storage.bucket.Bucket
        :param inclue_metadata: True to inclue metadata
        :type inclue_metadata: bool
        :param max_threads: Number of threads to use for downloading batch.  Don't increase this over 10.
        :type max_threads: int
        :return: Complete blob contents and metadata.
        :rtype: dict
        """
        def download_blob(blob_name, state):
            """Standalone function so that we can multithread this."""
            blob = bucket.blob(blob_name=blob_name)
            content = json.loads(blob.download_as_string())
            if inclue_metadata:
                blob.reload()
                metadata = blob.metadata
                if metadata:
                    state[blob_name] = {**content, **metadata}
            state[blob_name] = content

        batch_data = {bn: {} for bn in batch}
        threads = []
        active_thread_count = 0
        for blobname in batch:
            thread = threading.Thread(target=download_blob, kwargs={"blob_name": blobname, "state": batch_data})
            threads.append(thread)
            thread.start()
            active_thread_count += 1
            if active_thread_count == max_threads:
                # finish up threads in batches of size max_threads.  A better implementation would be a queue
                #   from which the threads can feed, but this is good enough if the blob size is roughtly the same.
                for thread in threads:
                    thread.join()
                threads = []
                active_thread_count = 0

        # wait for the last of the threads to be finished
        for thread in threads:
            thread.join()
        return batch_data

    def multiprocess_batches(self, batches, max_processes=None):
        """Spawn parallel process for downloading and processing batches.

        :param batches: An iterable of batches, probably a Generator.
        :type batches: Iterable
        :param max_processes: Maximum number of processes to spawn.  None for cpu_count
        :type max_processes: int or None
        :return: The response form all the processes.
        :rtype: dict
        """
        if max_processes is None:
            max_processes = multiprocessing.cpu_count() - 2
            logger.info("Using {} processes to process batches".format(max_processes))

        def single_proc(mp_batch, mp_bucket, batchresults):
            """Standalone function so that we can multiprocess this."""
            proc_res = self.download_batch_into_memory(mp_batch, mp_bucket)
            batchresults.update(proc_res)

        pool = multiprocessing.Pool(processes=max_processes)
        batch_results = multiprocessing.Manager().dict()

        jobs = []
        for batch in batches:
            logger.info("Processing batch with {} elements".format(len(batch)))
            # the client is not thread safe, so need to recreate the client for each process.
            bucket = storage.Client().get_bucket(self.bucket_name)
            proc = pool.Process(
                target=single_proc,
                kwargs={"mp_batch": batch, "mp_bucket": bucket, "batchresults": batch_results}
            )
            jobs.append(proc)
            proc.start()

        for job in jobs:
            job.join()

        logger.info("finished downloading {} blobs".format(len(batch_results)))
        return batch_results

    def bulk_download_as_dict(self, fetch_pattern):
        """Download blobs from google storage to

        :param fetch_pattern: A gsutil storage pattern.
        :type fetch_pattern: str
        :return: A dict with k,v pairs = {blobname: blob_data}
        :rtype: dict
        """
        start = datetime.datetime.now()
        filename_batches = self.create_blob_batches_by_pattern(fetch_pattern)
        downloaded_data = self.multiprocess_batches(filename_batches)
        logger.info("time taken to download = {}".format(datetime.datetime.now() - start))
        return downloaded_data


if __name__ == '__main__':
    stor = StorageDownloader("mybucket")
    data = stor.bulk_download_as_dict("some_prefix*")

这仍然可以使用相当数量的优化(例如,将线程排队,而不是等待挡路完成),但目前这对我来说已经足够了。

这篇关于Google Storage python API并行下载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆