从 S3 读取 ZIP 文件,无需下载整个文件 [英] Read ZIP files from S3 without downloading the entire file
问题描述
我们有大小为 5-10GB 的 ZIP 文件.典型的 ZIP 文件有 5-10 个内部文件,每个未压缩的大小为 1-5 GB.
We have ZIP files that are 5-10GB in size. The typical ZIP file has 5-10 internal files, each 1-5 GB in size uncompressed.
我有一套很好的 Python 工具来读取这些文件.基本上,我可以打开一个文件名,如果有 ZIP 文件,工具会在 ZIP 文件中搜索,然后打开压缩文件.这一切都相当透明.
I have a nice set of Python tools for reading these files. Basically, I can open a filename and if there is a ZIP file, the tools search in the ZIP file and then open the compressed file. It's all rather transparent.
我想将这些文件作为压缩文件存储在 Amazon S3 中.我可以获取 S3 文件的范围,所以应该可以获取 ZIP 中央目录(它是文件的结尾,所以我可以读取最后的 64KiB),找到我想要的组件,下载它,然后直接流式传输到调用过程.
I want to store these files in Amazon S3 as compressed files. I can fetch ranges of S3 files, so it should be possible to fetch the ZIP central directory (it's the end of the file, so I can just read the last 64KiB), find the component I want, download that, and stream directly to the calling process.
所以我的问题是,如何通过标准 Python ZipFile API 做到这一点?没有记录如何用支持 POSIX 语义的任意对象替换文件系统传输.这可能不重写模块吗?
So my question is, how do I do that through the standard Python ZipFile API? It isn't documented how to replace the filesystem transport with an arbitrary object that supports POSIX semantics. Is this possible without rewriting the module?
推荐答案
所以这里的代码允许您在 Amazon S3 上像打开普通文件一样打开文件.请注意,我使用了 aws
命令,而不是 boto3
Python 模块.(我无权访问 boto3.)您可以打开该文件并对其进行查找.该文件在本地缓存.如果您使用 Python ZipFile API 打开该文件并且它是一个 ZipFile,则您可以读取各个部分.但是,您不能写入,因为 S3 不支持部分写入.
So here is the code that allows you to open a file on Amazon S3 as if it were a normal file. Notice I use the aws
command, rather than the boto3
Python module. (I don't have access to boto3.) You can open the file and seek on it. The file is cached locally. If you open the file with the Python ZipFile API and it's a ZipFile, you can then read individual parts. You can't write, though, because S3 doesn't support partial writes.
另外,我实现了s3open()
,它可以打开一个文件进行读取或写入,但是它没有实现ZipFile所需的seek接口.
Separately, I implement s3open()
, which can open a file for reading or writing, but it doesn't implement the seek interface, which is required by ZipFile.
from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile
# Tools for reading and write files from Amazon S3 without boto or boto3
# http://boto.cloudhackers.com/en/latest/s3_tut.html
# but it is easier to use the aws cli, since it's configured to work.
def s3open(path, mode="r", encoding=None):
"""
Open an s3 file for reading or writing. Can handle any size, but cannot seek.
We could use boto.
http://boto.cloudhackers.com/en/latest/s3_tut.html
but it is easier to use the aws cli, since it is present and more likely to work.
"""
from subprocess import run,PIPE,Popen
if "b" in mode:
assert encoding == None
else:
if encoding==None:
encoding="utf-8"
assert 'a' not in mode
assert '+' not in mode
if "r" in mode:
p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
return p.stdout
elif "w" in mode:
p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
return p.stdin
else:
raise RuntimeError("invalid mode:{}".format(mode))
CACHE_SIZE=4096 # big enough for front and back caches
MAX_READ=65536*16
debug=False
class S3File:
"""Open an S3 file that can be seeked. This is done by caching to the local file system."""
def __init__(self,name,mode='rb'):
self.name = name
self.url = urlparse(name)
if self.url.scheme != 's3':
raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
self.bucket = self.url.netloc
self.key = self.url.path[1:]
self.fpos = 0
self.tf = tempfile.NamedTemporaryFile()
cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
file_info = data['Contents'][0]
self.length = file_info['Size']
self.ETag = file_info['ETag']
# Load the caches
self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
if self.length > CACHE_SIZE:
self.backcache_start = self.length-CACHE_SIZE
if debug: print("backcache starts at {}".format(self.backcache_start))
self.backcache = self._readrange(self.backcache_start,CACHE_SIZE)
else:
self.backcache = None
def _readrange(self,start,length):
# This is gross; we copy everything to the named temporary file, rather than a pipe
# because the pipes weren't showing up in /dev/fd/?
# We probably want to cache also... That's coming
cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
'--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
if debug:print(cmd)
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
if debug:print(data)
self.tf.seek(0) # go to the beginning of the data just read
return self.tf.read(length) # and read that much
def __repr__(self):
return "FakeFile<name:{} url:{}>".format(self.name,self.url)
def read(self,length=-1):
# If length==-1, figure out the max we can read to the end of the file
if length==-1:
length = min(MAX_READ, self.length - self.fpos + 1)
if debug:
print("read: fpos={} length={}".format(self.fpos,length))
# Can we satisfy from the front cache?
if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
if debug:print("front cache")
buf = self.frontcache[self.fpos:self.fpos+length]
self.fpos += len(buf)
if debug:print("return 1: buf=",buf)
return buf
# Can we satisfy from the back cache?
if self.backcache and (self.length - CACHE_SIZE < self.fpos):
if debug:print("back cache")
buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
self.fpos += len(buf)
if debug:print("return 2: buf=",buf)
return buf
buf = self._readrange(self.fpos, length)
self.fpos += len(buf)
if debug:print("return 3: buf=",buf)
return buf
def seek(self,offset,whence=0):
if debug:print("seek({},{})".format(offset,whence))
if whence==0:
self.fpos = offset
elif whence==1:
self.fpos += offset
elif whence==2:
self.fpos = self.length + offset
else:
raise RuntimeError("whence={}".format(whence))
if debug:print(" ={} (self.length={})".format(self.fpos,self.length))
def tell(self):
return self.fpos
def write(self):
raise RuntimeError("Write not supported")
def flush(self):
raise RuntimeError("Flush not supported")
def close(self):
return
这篇关于从 S3 读取 ZIP 文件,无需下载整个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!