是否可以让事件处理程序等到异步/基于Promise的代码完成? [英] Possible to make an event handler wait until async / Promise-based code is done?

查看:76
本文介绍了是否可以让事件处理程序等到异步/基于Promise的代码完成?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在nodejs模式下使用出色的Papa Parse库,将超过一百万行的大型(500 MB)CSV文件流式传输到慢速持久性API中,一次只能处理一个请求。持久性API基于 Promise s,但是从Papa Parse,我在 synchronous 事件中收到每个已解析的CSV行,如下所示: parseStream.on( data,row => {...}

I am using the excellent Papa Parse library in nodejs mode, to stream a large (500 MB) CSV file of over 1 million rows, into a slow persistence API, that can only take one request at a time. The persistence API is based on Promises, but from Papa Parse, I receive each parsed CSV row in a synchronous event like so: parseStream.on("data", row => { ... }

我面临的挑战是Papa Parse将其转储流中的CSV行是如此之快,以至于我的慢速持久性API无法跟上。因为Papa是同步且我的API是基于 Promise 的,所以我不能只调用 on 事件处理程序中的 await doDirtyWork(row),因为同步和异步代码不会混合。

The challenge I am facing is that Papa Parse dumps its CSV rows from the stream so fast that my slow persistence API can't keep up. Because Papa is synchronous and my API is Promise-based, I can't just call await doDirtyWork(row) in the on event handler, because sync and async code doesn't mix.

还是可以混合使用,我只是不知道如何?

Or can they mix and I just don't know how?

我的问题是,我可以让Papa的事件处理程序等待API调用完成吗?直接在 on( data)事件中执行持久性API请求,使 on()函数一直徘徊在肮脏的API工作完成之前?

My question is, can I make Papa's event handler wait for my API call to finish? Kind of doing the persistence API request directly in the on("data") event, making the on() function linger around somehow until the dirty API work is done?

我到目前为止的解决方案并不比使用Papa更好就内存占用而言,是非流式传输模式。实际上,我需要以生成器函数迭代的形式排队(事件)中的事件($ data $)事件的种子。我也可以在数组中排队诺言工厂,然后循环处理。无论哪种方式,我最终都会将几乎整个CSV文件保存为内存中大量的将来Promise(承诺工厂),直到我缓慢的API调用一直持续到整个工作为止。

The solution I have so far is not much better than using Papa's non-streaming mode, in terms of memory footprint. I actually need to queue up the torrent of on("data") events, in form of generator function iterations. I could have also queued up promise factories in an array and work it off in a loop. Any which way, I end up saving almost the entire CSV file as huge collection of future Promises (promise factories) in memory, until my slow API calls have worked all the way through.

async importCSV(filePath) {
    let parsedNum = 0, processedNum = 0;

    async function* gen() {
        let pf = yield;
        do {
            pf = yield await pf();
        } while (typeof pf === "function");
    };

    var g = gen();
    g.next();


    await new Promise((resolve, reject) => {
        try {
            const dataStream = fs.createReadStream(filePath);
            const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
            dataStream.pipe(parseStream);

            parseStream.on("data", row => {

                // Received a CSV row from Papa.parse()

                try {
                    console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
                    );
                    parsedNum++;

                    // Simulate some really slow async/await dirty work here, for example 
                    // send requests to a one-at-a-time persistence API

                    g.next(() => {  // don't execute now, call in sequence via the generator above
                        return new Promise((res, rej) => {
                            console.log(
                                "DW#", processedNum, ": dirty work START",
                                row.filter((e, i) => i <= 2 ? e : undefined)
                            );
                            setTimeout(() => {
                                console.log(
                                    "DW#", processedNum, ": dirty work STOP ",
                                    row.filter((e, i) => i <= 2 ? e : undefined)
                                );
                                processedNum++;
                                res();
                            }, 1000)
                        })
                    
                    });
                } catch (err) {
                    console.log(err.stack);
                    reject(err);                    
                }
            });
            parseStream.on("finish", () => {
                console.log(`Parsed ${parsedNum} rows`);
                resolve();
            });

        } catch (err) {
            console.log(err.stack);
            reject(err);                    
        }
    });
    while(!(await g.next()).done);
}

那为什么要赶爸爸呢?为什么不让我慢一点地处理文件-原始CSV文件中的数据不会消失,我们有几个小时才能完成流传输,为什么用 on(数据)似乎无法放慢速度的事件?

So why the rush Papa? Why not allow me to work down the file a bit slower -- the data in the original CSV file isn't gonna run away, we have hours to finish the streaming, why hammer me with on("data") events that I can't seem to slow down?

所以我真正需要的是让爸爸变得更多祖父,并最小化或消除任何CSV行的排队或缓冲。理想情况下,我将能够与我的API的速度(或缺乏速度)完全同步Papa的解析事件。因此,如果不是因为异步处理无法使同步代码休眠,那么我最好将每行CSV行发送到Papa事件内部的 API ,并且仅将然后将控制权交还给爸爸。

So what I really need is for Papa to become more of a grandpa, and minimize or eliminate any queuing or buffering of CSV rows. Ideally I would be able to completely sync Papa's parsing events with the speed (or lack thereof) of my API. So if it weren't for the dogma that async code can't make sync code "sleep", I would ideally send each CSV row to the API inside the Papa event, and only then return control to Papa.

建议?某种松散耦合指的是一种松散耦合。我的异步API缓慢的事件处理程序也可以。我不介意几百行排队。但是当成千上万的堆积起来时,我很快就会用完堆。

Suggestions? Some kind of "loose coupling" of the event handler with the slowness of my async API is fine too. I don't mind if a few hundred rows get queued up. But when tens of thousands pile up, I will run out of heap fast.

推荐答案


为什么要锤子 on( data)事件使我似乎无法放慢速度?

Why hammer me with on("data") events that I can't seem to slow down?

可以,您只是没有要求爸爸停止。您可以通过调用 stream.pause() ,然后稍后 stream.resume() 来利用Node stream内置的反压

You can, you just were not asking papa to stop. You can do this by calling stream.pause(), then later stream.resume() to make use of Node stream's builtin back-pressure.

但是,有比使用更好的API来处理您可以在基于回调的代码中自行执行以下操作:将流用作异步迭代器!当您在 for await 循环的主体中 await 时,生成器也必须暂停。因此,您可以编写

However, there's a much nicer API to use than dealing with this on your own in callback-based code: use the stream as an async iterator! When you await in the body of a for await loop, the generator has to pause as well. So you can write

async importCSV(filePath) {
    let parsedNum = 0;

    const dataStream = fs.createReadStream(filePath);
    const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
    dataStream.pipe(parseStream);

    for await (const row of parseStream) {
        // Received a CSV row from Papa.parse()
        const data = row.filter((e, i) => i <= 2 ? e : undefined);
        console.log("PA#", parsedNum, ": parsed", data);
        parsedNum++;
        await dirtyWork(data);
    }
    console.log(`Parsed ${parsedNum} rows`);
}

importCSV('sample.csv').catch(console.error);

let processedNum = 0;
function dirtyWork(data) {
    // Simulate some really slow async/await dirty work here,
    // for example send requests to a one-at-a-time persistence API
    return new Promise((res, rej) => {
        console.log("DW#", processedNum, ": dirty work START", data)
        setTimeout(() => {
             console.log("DW#", processedNum, ": dirty work STOP ", data);
             processedNum++;
             res();
        }, 1000);
    });
}

这篇关于是否可以让事件处理程序等到异步/基于Promise的代码完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆