S3文件在AWS Lambda中被多次处理 [英] S3 files being processed multiple times in AWS Lambda

查看:174
本文介绍了S3文件在AWS Lambda中被多次处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Java Lambda函数,该函数每15分钟由S3 Event触发一次.我注意到,在大约每3个小时的时间内,每次Lambda调用都包含在3个小时内上传的最新文件以及之前上传的所有文件.

因此,如果在遍历整个List时,它将重复在先前的Lambda调用中已经处理过的文件.

如何仅处理最新上传的文件?在node.js中,有一个context.suceed(),我假设将该事件标记为已成功处理. Java似乎没有.

下面是Cloudwatch日志.

08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST
08:35:26 TIME - AUTHENTICATE: 8101ms
08:35:26 TIME - MESSAGE PARSE: 1ms
08:35:26 data :: event/events/2016/    08/31/2016    0831123000.export.csv
08:35:35 Processed 147 events
08:35:35 TIME - FILE PARSE: 9698
08:35:35 Found 1 event files
08:35:35 Total function took: 17800ms
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST
08:45:03 TIME - AUTHENTICATE: 119ms
08:45:03 TIME - MESSAGE PARSE: 0ms
08:45:03 data :: event/events/2016/    08/31/2016    0831123000.export.csv
08:45:05 Processed 147 events
08:45:05 data :: event/events/2016/    08/31/2016    0831124500.export.csv
08:45:06 Processed 211 events
08:45:06 TIME - FILE PARSE: 2499
08:45:06 Found 2 event files
08:45:06 Total function took: 2618ms
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB
09:05:02 START RequestId: 8747aa    08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST
09:05:02 TIME - AUTHENTICATE: 98ms
09:05:02 TIME - MESSAGE PARSE: 0ms
09:05:02 data :: event/events/2016/    08/31/2016    0831123000.export.csv
09:05:03 Processed 147 events
09:05:03 data :: event/events/2016/    08/31/2016    0831124500.export.csv
09:05:04 Processed 211 events
09:05:04 data :: event/events/2016/    08/31/2016    0831130000.export.csv
09:05:04 Processed 204 events
09:05:04 TIME - FILE PARSE: 2242
09:05:04 Found 3 event files
09:05:04 Total function took: 2340ms
09:05:04 END RequestId: 8747aa    08-6f7b-11e6-80fd-f30a15cf07fc 

编辑1 我相信问题已经由Michael回答了,但是下面是一些其他人的代码.我确实在使用全局列表来保存记录.

公共类LambdaHandler {

private final List<GDELTEventFile> eventFiles = new ArrayList<>();
private AmazonS3Client s3Client;
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim();

public void gdeltHandler(S3Event event, Context context) {
    StopWatch sw = new StopWatch();
    long time = 0L;

    sw.start();
    s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
    sw.split();
    System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms");
    time += sw.getSplitTime();
    sw.reset();

    sw.start();
    processEvent(event);
    sw.split();
    System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms");
    time += sw.getSplitTime();
    sw.reset();

    sw.start();
    processFiles();
    sw.split();
    System.out.println("TIME - FILE PARSE: " + sw.getSplitTime());
    time += sw.getSplitTime();

    System.out.println("Found " + eventFiles.size() + " event files");
    System.out.println("Total function took: " + time + "ms");
}

private void processEvent(S3Event event) {
    List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords();
    for (S3EventNotification.S3EventNotificationRecord record : records) {
        long filesize = record.getS3().getObject().getSizeAsLong();
        eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize));
    }
}

private void processFiles() {
    for (GDELTEventFile event : eventFiles) {
        try {
            System.out.println(event.getBucket() + " :: " + event.getFilename());
            GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename());
            S3Object file = s3Client.getObject(request);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) {
                CSVParser parser = new CSVParser(reader, CSV_FORMAT);
                int count = 0;
                for (CSVRecord record : parser) {
                        count++;
                    }
                }
                System.out.println("Processed " + count + " events");
            }
        } catch (IOException ioe) {
            System.out.println("IOException :: " + ioe);
        }
    }
}

解决方案

这是一个代码案例,忽略了Lambda

This is a case of code that overlooks an important aspect of Lambda's container reuse -- container reuse in Lambda includes process reuse. When a function is executed in a reused container, it's also necessarily running in the same process that was used before as well.

S3's event notification data structure is such that it can include more than one object per event, but i practice, this never happpens... but pushing the event data into a global structure means that if the container is reused, then later function invocations will see the old data.

While this can be very useful as a cache, it has significant implications for how code must be designed -- always expect but never assume that your process may survive from one invocation to a future, subsequent invocation, and code accordingly.

Note that container reuse also means you need to clean up any temp files, if there is a chance that many reuses of a container will result in space exhaustion there.

Note also that redeploying your function code always means that old containers will be abandoned, not reused for future invocations of the latest version.

这篇关于S3文件在AWS Lambda中被多次处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆