如何使用 aws-java-sdk 从 S3 逐块读取文件 [英] How to read file chunk by chunk from S3 using aws-java-sdk

查看:28
本文介绍了如何使用 aws-java-sdk 从 S3 逐块读取文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将大文件从 S3 读取到块中,而无需为并行处理切割任何行.

I am trying to read large file into chunks from S3 without cutting any line for parallel processing.

让我举例说明:S3 上有大小为 1G 的文件.我想将此文件分成 64 MB 的夹头.很容易,我可以这样做:

Let me explain by example: There is file of size 1G on S3. I want to divide this file into chucks of 64 MB. It is easy I can do it like :

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) {

//process content here 

}

但是块的问题是它可能有 100 行完整的行和一个不完整的行.但我无法处理不完整的行,也不想丢弃它.

but problem with chunk is it may have 100 complete line and one incomplete. but I can not process incomplete line and don't want to discard it.

有什么办法可以处理这种情况吗?表示所有夹头都没有局部线.

Is any way to handle this situations ? means all chucks have no partial line.

推荐答案

我常用的做法 (InputStream -> BufferedReader.lines() -> 批行 ->CompletableFuture) 在这里不起作用,因为底层 S3ObjectInputStream 最终会因为大文件而超时.

My usual approach (InputStream -> BufferedReader.lines() -> batches of lines -> CompletableFuture) won't work here because the underlying S3ObjectInputStream times out eventually for huge files.

所以我创建了一个新类 S3InputStream,它不关心它的打开时间和使用短期 AWS SDK 调用按需读取字节块.您提供一个将被重用的 byte[].新字节[1 <<24] (16Mb) 似乎运行良好.

So I created a new class S3InputStream, which doesn't care how long it's open for and reads byte blocks on demand using short-lived AWS SDK calls. You provide a byte[] that will be reused. new byte[1 << 24] (16Mb) appears to work well.

package org.harrison;

import java.io.IOException;
import java.io.InputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;

/**
 * An {@link InputStream} for S3 files that does not care how big the file is.
 *
 * @author stephen harrison
 */
public class S3InputStream extends InputStream {
    private static class LazyHolder {
        private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
    }

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStream(final String bucket, final String file, final byte[] buffer) {
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
    }

    @Override
    public int read() throws IOException {
        if (next >= length) {
            fill();

            if (length <= 0) {
                return -1;
            }

            next = 0;
        }

        if (next >= length) {
            return -1;
        }

        return buffer[this.next++];
    }

    public void fill() throws IOException {
        if (offset >= lastByteOffset) {
            length = -1;
        } else {
            try (final InputStream inputStream = s3Object()) {
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) {
                    buffer[length++] = (byte) b;
                }

                if (length > 0) {
                    offset += length;
                }
            }
        }
    }

    private InputStream s3Object() {
        final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
                offset + buffer.length - 1);

        return LazyHolder.S3.getObject(request).getObjectContent();
    }
}

这篇关于如何使用 aws-java-sdk 从 S3 逐块读取文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆