如何通过管道将多个api请求中的多个可读流传输到单个可写流? [英] How to pipe multiple readable streams, from multiple api requests, to a single writeable stream?

查看:78
本文介绍了如何通过管道将多个api请求中的多个可读流传输到单个可写流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

-期望的行为
-实际行为
-我尝试过的
-复制步骤
-研究

- Desired Behaviour
- Actual Behaviour
- What I've Tried
- Steps To Reproduce
- Research

期望的行为

将从多个api请求接收的多个可读流放入单个可写流.

Pipe multiple readable streams, received from multiple api requests, to a single writeable stream.

api响应来自ibm-watson的 textToSpeech .synthesize()方法.

The api responses are from ibm-watson's textToSpeech.synthesize() method.

需要多个请求的原因是因为该服务对文本输入设置了5KB限制.

The reason multiple requests are required is because the service has a 5KB limit on text input.

因此,例如,字符串18KB需要四个请求才能完成.

Therefore a string of 18KB, for example, requires four requests to complete.

实际行为

可写流文件不完整且出现乱码.

The writeable stream file is incomplete and garbled.

应用程序似乎挂起".

The application seems to 'hang'.

当我尝试在音频播放器中打开不完整的.mp3文件时,它说文件已损坏.

When I try and open the incomplete .mp3 file in an audio player, it says it is corrupted.

打开和关闭文件的过程似乎增加了文件大小-就像打开文件一样,它提示更多数据流入其中.

The process of opening and closing the file seems to increase its file size - like opening the file somehow prompts more data to flow in to it.

对于较大的输入,例如四个4000字节或更少的字符串,不良行为更加明显.

Undesirable behaviour is more apparent with larger inputs, eg four strings of 4000 bytes or less.

我尝试过的事情

我尝试了几种使用npm包将可读流传输到单个可写流或多个可写流的方法.组合流组合流2 多数据流

I've tried several methods to pipe the readable streams to either a single writeable stream or multiple writeable streams using the npm packages combined-stream, combined-stream2, multistream and archiver and they all result in incomplete files. My last attempt doesn't use any packages and is shown in the Steps To Reproduce section below.

因此,我要质疑应用程序逻辑的每个部分:

I am therefore questioning each part of my application logic:

01..Watson文本对语音API请求的响应类型是什么?

01. What is the response type of a watson text to speech api request?

文本到语音文档 ,假设api响应类型为:

The text to speech docs, say the api response type is:

Response type: NodeJS.ReadableStream|FileObject|Buffer

我很困惑,响应类型是三种可能的事情之一.

I am confused that the response type is one of three possible things.

在所有尝试中,我一直假设它是readable stream.

In all my attempts, I have been assuming it is a readable stream.

02..我可以在地图函数中发出多个api请求吗?

02. Can I make multiple api requests in a map function?

03..我可以将每个请求包装在promise()中并解析response吗?

03. Can I wrap each request within a promise() and resolve the response?

04..我可以将结果数组分配给promises变量吗?

04. Can I assign the resulting array to a promises variable?

05.我可以声明var audio_files = await Promise.all(promises)吗?

06..声明之后,所有答复都完成"了吗?

06. After this declaration, are all responses 'finished'?

07..如何正确将每个响应传递给可写流?

07. How do I correctly pipe each response to a writable stream?

08..如何检测所有管道何时完成,以便可以将文件发送回客户端?

08. How do I detect when all pipes have finished, so I can send file back to client?

对于问题2-6,我假设答案为是".

For questions 2 - 6, I am assuming the answer is 'YES'.

我认为我的失败与问题7和8有关.

I think my failures relate to question 7 and 8.

复制步骤

您可以使用四个随机生成的文本字符串组成的数组来测试此代码,这些文本字符串的各自字节大小分别为3975386339743629个字节-

You can test this code with an array of four randomly generated text strings with a respective byte size of 3975, 3863, 3974 and 3629 bytes - here is a pastebin of that array.

// route handler
app.route("/api/:api_version/tts")
    .get(api_tts_get);

// route handler middleware
const api_tts_get = async (req, res) => {

    var query_parameters = req.query;

    var file_name = query_parameters.file_name;
    var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV

    var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
    var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root

    // for each string in an array, send it to the watson api  
    var promises = text_string_array.map(text_string => {

        return new Promise((resolve, reject) => {

            // credentials
            var textToSpeech = new TextToSpeechV1({
                iam_apikey: iam_apikey,
                url: tts_service_url
            });

            // params  
            var synthesizeParams = {
                text: text_string,
                accept: 'audio/mp3',
                voice: 'en-US_AllisonV3Voice'
            };

            // make request  
            textToSpeech.synthesize(synthesizeParams, (err, audio) => {
                if (err) {
                    console.log("synthesize - an error occurred: ");
                    return reject(err);
                }
                resolve(audio);
            });

        });
    });

    try {
        // wait for all responses
        var audio_files = await Promise.all(promises);
        var audio_files_length = audio_files.length;

        var write_stream = fs.createWriteStream(`${relative_path}.mp3`);

        audio_files.forEach((audio, index) => {

            // if this is the last value in the array, 
            // pipe it to write_stream, 
            // when finished, the readable stream will emit 'end' 
            // then the .end() method will be called on write_stream  
            // which will trigger the 'finished' event on the write_stream    
            if (index == audio_files_length - 1) {
                audio.pipe(write_stream);
            }
            // if not the last value in the array, 
            // pipe to write_stream and leave open 
            else {
                audio.pipe(write_stream, { end: false });
            }

        });

        write_stream.on('finish', function() {

            // download the file (using absolute_path)  
            res.download(`${absolute_path}.mp3`, (err) => {
                if (err) {
                    console.log(err);
                }
                // delete the file (using relative_path)  
                fs.unlink(`${relative_path}.mp3`, (err) => {
                    if (err) {
                        console.log(err);
                    }
                });
            });

        });


    } catch (err) {
        console.log("there was an error getting tts");
        console.log(err);
    }

}

官方示例显示:

textToSpeech.synthesize(synthesizeParams)
  .then(audio => {
    audio.pipe(fs.createWriteStream('hello_world.mp3'));
  })
  .catch(err => {
    console.log('error:', err);
  });

据我所知,这似乎对单个请求有效,但对多个请求却无效.

which seems to work fine for single requests, but not for multiple requests, as far as I can tell.

研究

关于可读和可写流,可读流模式(流动和暂停),数据",结束",排出"和完成"事件,pipe(),fs.createReadStream()和fs. createWriteStream()

几乎所有Node.js应用程序,无论多么简单,都以某种方式使用流...

Almost all Node.js applications, no matter how simple, use streams in some manner...

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream

let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});

// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});

https://nodejs.org/api/stream.html#stream_api_for_stream_consumers

可读流具有两种主要模式,这些模式会影响我们的消费方式...它们可以处于paused模式或flowing模式.默认情况下,所有可读流都以暂停模式开始,但是可以轻松将它们切换到flowing,并在需要时切换回paused ...只需添加data事件处理程序,就会将暂停的流切换为flowing模式并删除data事件处理程序将流切换回paused模式.

Readable streams have two main modes that affect the way we can consume them...they can be either in the paused mode or in the flowing mode. All readable streams start in the paused mode by default but they can be easily switched to flowing and back to paused when needed...just adding a data event handler switches a paused stream into flowing mode and removing the data event handler switches the stream back to paused mode.

https://www .freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93

以下是可与可读写流一起使用的重要事件和功能的列表

Here’s a list of the important events and functions that can be used with readable and writable streams

可读流上最重要的事件是:

The most important events on a readable stream are:

data事件,每当流将大量数据传递给使用者时都会发出此事件 end事件,当没有更多数据要从流中使用时发出.

The data event, which is emitted whenever the stream passes a chunk of data to the consumer The end event, which is emitted when there is no more data to be consumed from the stream.

可写流上最重要的事件是:

The most important events on a writable stream are:

drain事件,这是可写流可以接收更多数据的信号. finish事件,当所有数据都已刷新到基础系统时发出.

The drain event, which is a signal that the writable stream can receive more data. The finish event, which is emitted when all data has been flushed to the underlying system.

https://www .freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93

.pipe()负责侦听fs.createReadStream()中的数据"和结束"事件.

.pipe() takes care of listening for 'data' and 'end' events from the fs.createReadStream().

https://github.com/substack/stream -handbook#为什么要使用流

.pipe()只是一个函数,它采用可读的源流src并将输出挂钩到目标可写流dst

.pipe() is just a function that takes a readable source stream src and hooks the output to a destination writable stream dst

https://github.com/substack/stream-handbook#pipe

pipe()方法的返回值是目标流

The return value of the pipe() method is the destination stream

https://flaviocopes.com/nodejs-streams/#pipe


https://flaviocopes.com/nodejs-streams/#pipe

默认情况下,在目标 stream.end()当源Readable流发出'end'时,该流不再可用,因此目标不再可写.要禁用此默认行为,可以将end选项作为false传递,从而使目标流保持打开状态:

By default, stream.end() is called on the destination Writable stream when the source Readable stream emits 'end', so that the destination is no longer writable. To disable this default behavior, the end option can be passed as false, causing the destination stream to remain open:

https://nodejs.org/api/stream.html#stream_可读_pipe_destination_options

在调用stream.end()方法并将所有数据都刷新到基础系统之后,会发出'finish'事件.

The 'finish' event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.log('All writes are now complete.');
});

https://nodejs.org/api/stream.html#stream_event_finish

如果您尝试读取多个文件并将其通过管道传输到可写流,则必须将每个文件通过管道传输到可写流,并在执行时传递end: false,因为默认情况下,可读流会结束可写流当没有更多数据要读取时流式传输.这是一个示例:

If you're trying to read multiple files and pipe them to a writable stream, you have to pipe each one to the writable stream and and pass end: false when doing it, because by default, a readable stream ends the writable stream when there's no more data to be read. Here's an example:

var ws = fs.createWriteStream('output.pdf');

fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);

https://stackoverflow.com/a/30916248

您要将第二个阅读添加到事件监听器中,以使第一个阅读完成...

You want to add the second read into an eventlistener for the first read to finish...

var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
  b.pipe(c)
}

https://stackoverflow.com/a/28033554

节点流简史-部分一个

A Brief History of Node Streams - part one and two.

相关的Google搜索:

Related Google search:

如何将多个可读流传递到单个可写流? nodejs

how to pipe multiple readable streams to a single writable stream? nodejs

涵盖相同或相似主题的问题,没有权威性答案(或者可能过时"):

Questions covering the same or similar topic, without authoritative answers (or might be 'outdated'):

如何将多个ReadableStreams传递到单个WriteStream?/a>

How to pipe multiple ReadableStreams to a single WriteStream?

通过不同的Readable两次写入同一可写流流

将多个文件放入一个响应

从两个管道流中创建Node.js流

推荐答案

此处要解决的核心问题是异步性.您几乎拥有了它:发布代码的问题在于,您正在并行地并行处理所有源流.无序进入目标流.这意味着data块将随机地从不同的音频流中流出-即使您的end事件也将超越pipe,而没有end太早关闭目标流,这可能解释了为什么重新打开目标流后它会增加

The core problem to solve here is asynchronicity. You almost had it: the problem with the code you posted is that you are piping all source streams in parallel & unordered into the target stream. This means data chunks will flow randomly from different audio streams - even your end event will outrace the pipes without end closing the target stream too early, which might explain why it increases after you re-open it.

您想要的是按顺序输送它们-甚至在引用时发布了解决方案

What you want is to pipe them sequentially - you even posted the solution when you quoted

您要将第二个阅读添加到事件监听器中,以使第一个阅读完成...

You want to add the second read into an eventlistener for the first read to finish...

或作为代码:

a.pipe(c, { end:false });
a.on('end', function() {
  b.pipe(c);
}

这会将源流按顺序输送到目标流中.

This will pipe the source streams in sequential order into the target stream.

采用您的代码,这意味着将audio_files.forEach循环替换为:

Taking your code this would mean to replace the audio_files.forEach loop with:

await Bluebird.mapSeries(audio_files, async (audio, index) => {  
    const isLastIndex = index == audio_files_length - 1;
    audio.pipe(write_stream, { end: isLastIndex });
    return new Promise(resolve => audio.on('end', resolve));
});

在此处注意 bluebird.js mapSeries 的用法.

有关您的代码的其他建议:

Further advice regarding your code:

  • 您应该考虑使用 lodash.js
  • 您应该使用const& let代替var并考虑使用camelCase
  • 当您注意到它只能在一个事件中起作用,而在多个事件中却失败"时,总会想到:异步性,排列,竞争条件.
  • you should consider using lodash.js
  • you should use const & let instead of var and consider using camelCase
  • when you notice "it works with one event, but fails with multiple" always think: asynchronicity, permutations, race conditions.

进一步阅读,组合本机节点流的限制: https://github.com/nodejs/node/issues/93

Further reading, limitations of combining native node streams: https://github.com/nodejs/node/issues/93

这篇关于如何通过管道将多个api请求中的多个可读流传输到单个可写流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆