S3文件在AWS Lambda中被多次处理 [英] S3 files being processed multiple times in AWS Lambda
问题描述
我有一个Java Lambda函数,该函数每15分钟由S3 Event触发一次.我注意到,在大约每3个小时的时间内,每次Lambda调用都包含在3个小时内上传的最新文件以及之前上传的所有文件.
因此,如果在遍历整个List时,它将重复在先前的Lambda调用中已经处理过的文件.
如何仅处理最新上传的文件?在node.js中,有一个context.suceed(),我假设将该事件标记为已成功处理. Java似乎没有.
下面是Cloudwatch日志.
08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST
08:35:26 TIME - AUTHENTICATE: 8101ms
08:35:26 TIME - MESSAGE PARSE: 1ms
08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
08:35:35 Processed 147 events
08:35:35 TIME - FILE PARSE: 9698
08:35:35 Found 1 event files
08:35:35 Total function took: 17800ms
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST
08:45:03 TIME - AUTHENTICATE: 119ms
08:45:03 TIME - MESSAGE PARSE: 0ms
08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
08:45:05 Processed 147 events
08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv
08:45:06 Processed 211 events
08:45:06 TIME - FILE PARSE: 2499
08:45:06 Found 2 event files
08:45:06 Total function took: 2618ms
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB
09:05:02 START RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST
09:05:02 TIME - AUTHENTICATE: 98ms
09:05:02 TIME - MESSAGE PARSE: 0ms
09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv
09:05:03 Processed 147 events
09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv
09:05:04 Processed 211 events
09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv
09:05:04 Processed 204 events
09:05:04 TIME - FILE PARSE: 2242
09:05:04 Found 3 event files
09:05:04 Total function took: 2340ms
09:05:04 END RequestId: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc
编辑1 我相信问题已经由Michael回答了,但是下面是一些其他人的代码.我确实在使用全局列表来保存记录.
公共类LambdaHandler {
private final List<GDELTEventFile> eventFiles = new ArrayList<>();
private AmazonS3Client s3Client;
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim();
public void gdeltHandler(S3Event event, Context context) {
StopWatch sw = new StopWatch();
long time = 0L;
sw.start();
s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
sw.split();
System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms");
time += sw.getSplitTime();
sw.reset();
sw.start();
processEvent(event);
sw.split();
System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms");
time += sw.getSplitTime();
sw.reset();
sw.start();
processFiles();
sw.split();
System.out.println("TIME - FILE PARSE: " + sw.getSplitTime());
time += sw.getSplitTime();
System.out.println("Found " + eventFiles.size() + " event files");
System.out.println("Total function took: " + time + "ms");
}
private void processEvent(S3Event event) {
List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords();
for (S3EventNotification.S3EventNotificationRecord record : records) {
long filesize = record.getS3().getObject().getSizeAsLong();
eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize));
}
}
private void processFiles() {
for (GDELTEventFile event : eventFiles) {
try {
System.out.println(event.getBucket() + " :: " + event.getFilename());
GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename());
S3Object file = s3Client.getObject(request);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) {
CSVParser parser = new CSVParser(reader, CSV_FORMAT);
int count = 0;
for (CSVRecord record : parser) {
count++;
}
}
System.out.println("Processed " + count + " events");
}
} catch (IOException ioe) {
System.out.println("IOException :: " + ioe);
}
}
}
这是一个代码案例,忽略了Lambda
This is a case of code that overlooks an important aspect of Lambda's container reuse -- container reuse in Lambda includes process reuse. When a function is executed in a reused container, it's also necessarily running in the same process that was used before as well. S3's event notification data structure is such that it can include more than one object per event, but i practice, this never happpens... but pushing the event data into a global structure means that if the container is reused, then later function invocations will see the old data. While this can be very useful as a cache, it has significant implications for how code must be designed -- always expect but never assume that your process may survive from one invocation to a future, subsequent invocation, and code accordingly. Note that container reuse also means you need to clean up any temp files, if there is a chance that many reuses of a container will result in space exhaustion there. Note also that redeploying your function code always means that old containers will be abandoned, not reused for future invocations of the latest version. 这篇关于S3文件在AWS Lambda中被多次处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!