NodeJS流不等待异步 [英] NodeJS streams not awaiting async

查看:70
本文介绍了NodeJS流不等待异步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在测试NodeJS流时遇到了一个问题.我似乎无法让我的项目在运行stream.pipeline之后等待双工和Transform流的输出,即使它正在返回承诺.也许我缺少了一些东西,但是我相信脚本应该在继续执行之前等待函数返回.我要开始工作的项目中最重要的部分是:

I have run into an issue when testing NodeJS streams. I can't seem to get my project to wait for the output from the Duplex and Transform streams after running a stream.pipeline, even though it is returning a promise. Perhaps I'm missing something, but I believe that the script should wait for the function to return before continuing. The most important part of the project I'm trying to get working is:

// Message system is a duplex (read/write) stream
export class MessageSystem extends Duplex {
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _read(size: number): void {
        var chunk = this.read();
        console.log(`Recieved ${chunk}`);
        this.push(chunk);
    }
    public _write(chunk: Message, encoding: string, 
        callback: (error?: Error | null | undefined, chunk?: Message) => any): void {
        if (chunk.data === null) {
            callback(new Error("Message.Data is null"));
        } else {
            callback();
        }
    }
}

export class SystemStream extends Transform {
    public type: MessageType = MessageType.Global;
    public data: Array<Message> = new Array<Message>();
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _transform(chunk: Message, encoding: string, 
        callback: TransformCallback): void {
        if (chunk.single && (chunk.type === this.type || chunk.type === MessageType.Global)) {
            console.log(`Adding ${chunk}`);
            this.data.push(chunk);
            chunk = new Message(chunk.data, MessageType.Removed, true);
            callback(undefined, chunk); // TODO: Is this correct?
        } else if (chunk.type === this.type || chunk.type === MessageType.Global) { // Ours and global
            this.data.push(chunk);
            callback(undefined, chunk);
        } else { // Not ours
            callback(undefined, chunk);
        }
    }
}

export class EngineStream extends SystemStream {
    public type: MessageType = MessageType.Engine;
}

export class IOStream extends SystemStream {
    public type: MessageType = MessageType.IO;
}

let ms = new MessageSystem();
let es = new EngineStream();
let io = new IOStream();

let pipeline = promisify(Stream.pipeline);

async function start() {
    console.log("Running Message System");
    console.log("Writing new messages");
    ms.write(new Message("Hello"));
    ms.write(new Message("world!"));
    ms.write(new Message("Engine data", MessageType.Engine));
    ms.write(new Message("IO data", MessageType.IO));
    ms.write(new Message("Order matters in the pipe, even if Global", MessageType.Global, true));
    ms.end(new Message("Final message in the stream"));
    console.log("Piping data");
    await pipeline(
        ms,
        es,
        io
    );
}

Promise.all([start()]).then(() => {
    console.log(`Engine Messages to parse: ${es.data.toString()}`);
    console.log(`IO Messages to parse: ${io.data.toString()}`);
});

输出应类似于:

Running message system
Writing new messages
Hello
world!
Engine Data
IO Data
Order Matters in the pipe, even if Global
Engine messages to parse: Engine Data
IO messages to parse: IO Data

任何帮助将不胜感激.谢谢!

Any help would be greatly appreciated. Thanks!

注意:我用其他帐户发布了该帐户,而不是我的实际帐户.道歉的重复.

Note: I posted this with my other account, and not this one that is my actual account. Apologies for the duplicate.

最初,我有私有的repo,但是为了帮助弄清答案,已经公开了.可以在 feature/inital_system分支中找到更多用法.签出时可以使用 npm start 运行.

I initially had the repo private, but have made it public to help clarify the answer. More usage can be found on the feature/inital_system branch. It can be run with npm start when checked out.

为了方便,我将自定义流放在此处.我认为我的状态比以前更好,但是现在在管道中收到了一个空"对象.

I've put my custom streams here for verbosity. I think I'm on a better track than before, but now getting a "null" object recieved down the pipeline.

推荐答案

在过去几天的工作之后,我找到了答案.问题是我对Duplex流的实现.此后,我将 MessageSystem 更改为Transform流,以便于管理和使用.

After some work of the past couple of days, I've found my answer. The issue was my implementation of the Duplex stream. I have since changed the MessageSystem to be a Transform stream to be easier to manage and work with.

这是产品:

export class MessageSystem extends Transform {
    constructor() {
        super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
    }
    public _transform(chunk: Message, encoding: string,
        callback: TransformCallback): void {
            try {
                let output: string = chunk.toString();
                callback(undefined, output);
            } catch (err) {
                callback(err);
            }
        }
}

感谢@estus进行快速回复并进行检查.同样,我一直在API中找到答案!

Thank you to @estus for the quick reply and check. Again, I find my answer in the API all along!

可以在此存储库中找到我的发现的存档存储库..

An archived repository of my findings can be found in this repository.

这篇关于NodeJS流不等待异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆