如何使用 aws-java-sdk 从 S3 逐块读取文件 [英] How to read file chunk by chunk from S3 using aws-java-sdk
问题描述
我正在尝试将大文件从 S3 读取到块中,而无需为并行处理切割任何行.
I am trying to read large file into chunks from S3 without cutting any line for parallel processing.
让我举例说明:S3 上有大小为 1G 的文件.我想将此文件分成 64 MB 的夹头.很容易,我可以这样做:
Let me explain by example: There is file of size 1G on S3. I want to divide this file into chucks of 64 MB. It is easy I can do it like :
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
InputStream stream = s3object.getObjectContent();
byte[] content = new byte[64*1024*1024];
while (stream.read(content) != -1) {
//process content here
}
但是块的问题是它可能有 100 行完整的行和一个不完整的行.但我无法处理不完整的行,也不想丢弃它.
but problem with chunk is it may have 100 complete line and one incomplete. but I can not process incomplete line and don't want to discard it.
有什么办法可以处理这种情况吗?表示所有夹头都没有局部线.
Is any way to handle this situations ? means all chucks have no partial line.
推荐答案
我常用的做法 (InputStream
-> BufferedReader.lines()
-> 批行 ->CompletableFuture
) 在这里不起作用,因为底层 S3ObjectInputStream
最终会因为大文件而超时.
My usual approach (InputStream
-> BufferedReader.lines()
-> batches of lines -> CompletableFuture
) won't work here because the underlying S3ObjectInputStream
times out eventually for huge files.
所以我创建了一个新类 S3InputStream
,它不关心它的打开时间和使用短期 AWS SDK 调用按需读取字节块.您提供一个将被重用的 byte[]
.新字节[1 <<24]
(16Mb) 似乎运行良好.
So I created a new class S3InputStream
, which doesn't care how long it's open for and reads byte blocks on demand using short-lived AWS SDK calls. You provide a byte[]
that will be reused. new byte[1 << 24]
(16Mb) appears to work well.
package org.harrison;
import java.io.IOException;
import java.io.InputStream;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
/**
* An {@link InputStream} for S3 files that does not care how big the file is.
*
* @author stephen harrison
*/
public class S3InputStream extends InputStream {
private static class LazyHolder {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
}
private final String bucket;
private final String file;
private final byte[] buffer;
private long lastByteOffset;
private long offset = 0;
private int next = 0;
private int length = 0;
public S3InputStream(final String bucket, final String file, final byte[] buffer) {
this.bucket = bucket;
this.file = file;
this.buffer = buffer;
this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
}
@Override
public int read() throws IOException {
if (next >= length) {
fill();
if (length <= 0) {
return -1;
}
next = 0;
}
if (next >= length) {
return -1;
}
return buffer[this.next++];
}
public void fill() throws IOException {
if (offset >= lastByteOffset) {
length = -1;
} else {
try (final InputStream inputStream = s3Object()) {
length = 0;
int b;
while ((b = inputStream.read()) != -1) {
buffer[length++] = (byte) b;
}
if (length > 0) {
offset += length;
}
}
}
}
private InputStream s3Object() {
final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
offset + buffer.length - 1);
return LazyHolder.S3.getObject(request).getObjectContent();
}
}
这篇关于如何使用 aws-java-sdk 从 S3 逐块读取文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!