从S3到python的csv流/分块 [英] Streaming in / chunking csv's from S3 to Python

查看:60
本文介绍了从S3到python的csv流/分块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打算使用Python对存储在S3中的非常大的csv文件执行一些内存密集型操作,以将脚本移至AWS Lambda.我知道我可以读取整个csv nto内存,但是我肯定会遇到这么大的filem导致Lambda的内存和存储限制,是否有任何方法可以使用boto3一次流式传输或仅将csv的大块读取到Python中/botocore,理想情况下是通过分散行号以读入?

I intend to perform some memory intensive operations on a very large csv file stored in S3 using Python with the intention of moving the script to AWS Lambda. I know I can read in the whole csv nto memory, but I will definitely run into Lambda's memory and storage limits with such a large filem is there any way to stream in or just read in chunks of a csv at a time into Python using boto3/botocore, ideally by spefifying row numbers to read in?

以下是我已经尝试过的一些东西:

Here are some things I've already tried:

1)使用 S3.get_object 中的 range 参数指定要读取的字节范围.不幸的是,这意味着最后一行在中间被截断,因为没有办法指定要读取的行数.有一些麻烦的解决方法,例如扫描最后一个换行符,记录索引,然后将其用作下一个字节范围的起点,但是我想尽可能避免使用笨拙的解决方案.

1) using the range parameter in S3.get_object to specify the range of bytes to read in. Unfortunately this means the last rows get cut off in the middle since there's no ways to specify the number of rows to read in. There are some messy workarounds like scanning for the last newline character, recording the index, and then using that as the starting point for the next bytes range, but I'd like to avoid this clunky solution if possible.

2)使用S3 select编写sql查询,以有选择地从S3存储桶中检索数据.不幸的是,不支持 row_numbers SQL函数,而且看起来好像没有一种方法可以读取一部分行.

2) Using S3 select to write sql queries to selectively retrieve data from S3 buckets. Unfortunately the row_numbers SQL function isn't supported and it doesn't look like there's a way to read in a a subset of rows.

推荐答案

假设您的文件未压缩,则应包括从流中读取并拆分换行符.读取数据块,找到该数据块中换行符的最后一个实例,进行拆分和处理.

Assuming your file isn't compressed, this should involve reading from a stream and splitting on the newline character. Read a chunk of data, find the last instance of the newline character in that chunk, split and process.

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

如果压缩文件,则需要在循环内使用 BytesIO GzipFile 类;这是一个更棘手的问题,因为您需要保留Gzip压缩详细信息.

If you have gzipped files, then you need to use BytesIO and the GzipFile class inside the loop; it's a harder problem because you need to retain the Gzip compression details.

这篇关于从S3到python的csv流/分块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆