从S3读取ZIP文件,而无需下载整个文件 [英] Read ZIP files from S3 without downloading the entire file

查看:116
本文介绍了从S3读取ZIP文件,而无需下载整个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有5-10GB的ZIP文件.典型的ZIP文件有5-10个内部文件,每个1-5 GB的未压缩大小.

We have ZIP files that are 5-10GB in size. The typical ZIP file has 5-10 internal files, each 1-5 GB in size uncompressed.

我有一套不错的Python工具来读取这些文件.基本上,我可以打开一个文件名,如果有一个ZIP文件,则该工具会在ZIP文件中进行搜索,然后打开压缩文件.这都是相当透明的.

I have a nice set of Python tools for reading these files. Basically, I can open a filename and if there is a ZIP file, the tools search in the ZIP file and then open the compressed file. It's all rather transparent.

我想将这些文件作为压缩文件存储在Amazon S3中.我可以提取S3文件的范围,因此应该可以提取ZIP中央目录(这是文件的末尾,因此我只能读取最后一个64KiB),找到所需的组件,下载该组件,然后直接流式传输到调用过程.

I want to store these files in Amazon S3 as compressed files. I can fetch ranges of S3 files, so it should be possible to fetch the ZIP central directory (it's the end of the file, so I can just read the last 64KiB), find the component I want, download that, and stream directly to the calling process.

所以我的问题是,如何通过标准的Python ZipFile API做到这一点?没有记录如何使用支持POSIX语义的任意对象替换文件系统传输.不用重写模块就可以吗?

So my question is, how do I do that through the standard Python ZipFile API? It isn't documented how to replace the filesystem transport with an arbitrary object that supports POSIX semantics. Is this possible without rewriting the module?

推荐答案

因此,以下代码使您可以像正常文件一样在Amazon S3上打开文件.注意,我使用的是aws命令,而不是boto3 Python模块. (我无权访问boto3.)您可以打开文件并对其进行搜索.该文件在本地缓存.如果您使用Python ZipFile API打开文件并且它是ZipFile,则可以读取各个部分.但是,您无法编写,因为S3不支持部分写入.

So here is the code that allows you to open a file on Amazon S3 as if it were a normal file. Notice I use the aws command, rather than the boto3 Python module. (I don't have access to boto3.) You can open the file and seek on it. The file is cached locally. If you open the file with the Python ZipFile API and it's a ZipFile, you can then read individual parts. You can't write, though, because S3 doesn't support partial writes.

另外,我实现了s3open(),它可以打开一个文件进行读取或写入,但是它没有实现ZipFile.

Separately, I implement s3open(), which can open a file for reading or writing, but it doesn't implement the seek interface, which is required by ZipFile.

from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile

# Tools for reading and write files from Amazon S3 without boto or boto3
# http://boto.cloudhackers.com/en/latest/s3_tut.html
# but it is easier to use the aws cli, since it's configured to work.

def s3open(path, mode="r", encoding=None):
    """
    Open an s3 file for reading or writing. Can handle any size, but cannot seek.
    We could use boto.
    http://boto.cloudhackers.com/en/latest/s3_tut.html
    but it is easier to use the aws cli, since it is present and more likely to work.
    """
    from subprocess import run,PIPE,Popen
    if "b" in mode:
        assert encoding == None
    else:
        if encoding==None:
            encoding="utf-8"
    assert 'a' not in mode
    assert '+' not in mode

    if "r" in mode:
        p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
        return p.stdout

    elif "w" in mode:
        p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
        return p.stdin
    else:
        raise RuntimeError("invalid mode:{}".format(mode))




CACHE_SIZE=4096                 # big enough for front and back caches
MAX_READ=65536*16
debug=False
class S3File:
    """Open an S3 file that can be seeked. This is done by caching to the local file system."""
    def __init__(self,name,mode='rb'):
        self.name   = name
        self.url    = urlparse(name)
        if self.url.scheme != 's3':
            raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
        self.bucket = self.url.netloc
        self.key    = self.url.path[1:]
        self.fpos   = 0
        self.tf     = tempfile.NamedTemporaryFile()
        cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
        data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
        file_info = data['Contents'][0]
        self.length = file_info['Size']
        self.ETag   = file_info['ETag']

        # Load the caches

        self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
        if self.length > CACHE_SIZE:
            self.backcache_start = self.length-CACHE_SIZE
            if debug: print("backcache starts at {}".format(self.backcache_start))
            self.backcache  = self._readrange(self.backcache_start,CACHE_SIZE)
        else:
            self.backcache  = None

    def _readrange(self,start,length):
        # This is gross; we copy everything to the named temporary file, rather than a pipe
        # because the pipes weren't showing up in /dev/fd/?
        # We probably want to cache also... That's coming
        cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
               '--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
        if debug:print(cmd)
        data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
        if debug:print(data)
        self.tf.seek(0)         # go to the beginning of the data just read
        return self.tf.read(length) # and read that much

    def __repr__(self):
        return "FakeFile<name:{} url:{}>".format(self.name,self.url)

    def read(self,length=-1):
        # If length==-1, figure out the max we can read to the end of the file
        if length==-1:
            length = min(MAX_READ, self.length - self.fpos + 1)

        if debug:
            print("read: fpos={}  length={}".format(self.fpos,length))
        # Can we satisfy from the front cache?
        if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
            if debug:print("front cache")
            buf = self.frontcache[self.fpos:self.fpos+length]
            self.fpos += len(buf)
            if debug:print("return 1: buf=",buf)
            return buf

        # Can we satisfy from the back cache?
        if self.backcache and (self.length - CACHE_SIZE < self.fpos):
            if debug:print("back cache")
            buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
            self.fpos += len(buf)
            if debug:print("return 2: buf=",buf)
            return buf

        buf = self._readrange(self.fpos, length)
        self.fpos += len(buf)
        if debug:print("return 3: buf=",buf)
        return buf

    def seek(self,offset,whence=0):
        if debug:print("seek({},{})".format(offset,whence))
        if whence==0:
            self.fpos = offset
        elif whence==1:
            self.fpos += offset
        elif whence==2:
            self.fpos = self.length + offset
        else:
            raise RuntimeError("whence={}".format(whence))
        if debug:print("   ={}  (self.length={})".format(self.fpos,self.length))

    def tell(self):
        return self.fpos

    def write(self):
        raise RuntimeError("Write not supported")

    def flush(self):
        raise RuntimeError("Flush not supported")

    def close(self):
        return

这篇关于从S3读取ZIP文件,而无需下载整个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆