如何在NodeJS 12 Lambda函数中使用async/await将读取的S3 JSON文件流式传输到postgreSQL? [英] How to stream read an S3 JSON file to postgreSQL using async/await in a NodeJS 12 Lambda function?

查看:166
本文介绍了如何在NodeJS 12 Lambda函数中使用async/await将读取的S3 JSON文件流式传输到postgreSQL?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我没有意识到这样一个简单的任务会多么危险. 我们正在尝试流式读取存储在S3-中的JSON文件-我认为我们已经完成了这部分工作. 我们的.on('data')回调正在被调用,但是Node选择并选择它想要运行的位-似乎是随机的.

I didn't realize how perilous such a simple task could be. We're trying to stream-read a JSON file stored in S3--I think we have that part working. Our .on('data') callback is getting called, but Node picks and chooses what bits it wants to run--seemingly at random.

我们设置了流阅读器.

stream.on('data', async x => { 
  await saveToDb(x);  // This doesn't await.  It processes saveToDb up until it awaits.
});

有时db调用将其发送到db,但大多数情况下不会. 我得出的结论是EventEmitter在异步/等待事件处理程序方面存在问题. 只要您的代码是同步的,它似乎就会与您的async方法一起使用.但是,在您等待的那一刻,它会随机决定是否实际执行此操作.

Sometimes the db call makes it to the db--but most of the time it doesn't. I've come to the conclusion that EventEmitter has problems with async/await event handlers. It appears as though it will play along with your async method so long as your code is synchronous. But, at the point you await, it randomly decides whether to actually follow through with doing it or not.

它流传输各种块,我们可以console.log将它们取出并查看数据.但是,一旦我们尝试触发一个等待/异步呼叫,我们就会停止看到可靠的消息.

It streams the various chunks and we can console.log them out and see the data. But as soon as we try to fire off an await/async call, we stop seeing reliable messages.

我正在AWS Lambda中运行此程序,并被告知有一些特殊注意事项,因为它们显然在某些情况下会停止处理?

I'm running this in AWS Lambda and I've been told that there are special considerations because apparently they halt processing in some cases?

我尝试将IFFY中的await呼叫括起来,但这也不起作用.

I tried surrounding the await call in an IFFY, but that didn't work, either.

我想念什么? 没有办法告诉JavaScript —好吧,我需要您同步运行此异步任务.我的意思是,也不要触发其他事件通知.只需坐在这里等."?

What am I missing? Is there no way of telling JavaScript--"Okay, I need you to run this async task synchronously. I mean it--don't go and fire off any more event notifications, either. Just sit here and wait."?

推荐答案

TL; DR:

  • 使用异步迭代器从流管道的末尾拉出!
  • 请勿在您的任何流代码中使用异步功能!

详细信息:

关于async/await和流的生命奥秘的秘密似乎被包裹在Async Iterators中!

The secret to life's mystery regarding async/await and streams appears to be wrapped up in Async Iterators!

简而言之,我通过管道将一些流传输到一起,最后,我创建了一个异步迭代器以将内容从末尾拉出,以便可以异步调用db. ChunkStream对我唯一要做的就是最多排队1,000个调用db的队列,而不是针对每个项目.我是队列的新手,所以可能已经有了更好的方法.

In short, I piped some streams together and at the very end, I created an async iterator to pull stuff out of the end so that I could asynchronously call the db. The only thing ChunkStream does for me is to queue up to 1,000 to call the db with instead of for each item. I'm new to queues, so there may already be a better way of doing that.

// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');

exports.handler = async (event, context) => {
    // ...
    let bucket, key;
    try {
        bucket = event.Records[0].s3.bucket.name;
        key = event.Records[0].s3.object.key;
        console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
        const parser = JSONStream.parse('*'); // Converts file to JSON objects
        let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
        let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
        
        let totalProcessed = 0;
        async function processChunk(chunk) {
            let chunkString = JSON.stringify(chunk);
            console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
            await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
            totalProcessed += chunk.length;
        }
        
        // Async iterator
        for await (const batch of endStream) {
            // console.log(`Processing batch (${batch.length})`, batch);
            await processChunk(batch);
        }
    } catch (ex) {
        context.fail("stream S3 file failed");
        throw ex;
    }
};

class ChunkStream extends stream.Transform {
    constructor(maxItems, options = {}) {
        options.objectMode = true;
        super(options);
        this.maxItems = maxItems;
        this.batch = [];
    }
    _transform(item, enc, cb) {
        this.batch.push(item);
        if (this.batch.length >= this.maxItems) {
            // console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
            this.push(this.batch);
            // console.log('_transform - Restarting the batch');
            this.batch = [];
        }
        cb();
    }
    _flush(cb) {
        // console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
        if (this.batch.length > 0) {
            this.push(this.batch);
            this.batch = [];
        }
        cb();
    }
}

这篇关于如何在NodeJS 12 Lambda函数中使用async/await将读取的S3 JSON文件流式传输到postgreSQL?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆