如何通过管道将多个api请求中的多个可读流传输到单个可写流? [英] How to pipe multiple readable streams, from multiple api requests, to a single writeable stream?
问题描述
-期望的行为
-实际行为
-我尝试过的
-复制步骤
-研究
- Desired Behaviour
- Actual Behaviour
- What I've Tried
- Steps To Reproduce
- Research
期望的行为
将从多个api请求接收的多个可读流放入单个可写流.
Pipe multiple readable streams, received from multiple api requests, to a single writeable stream.
api响应来自ibm-watson的 textToSpeech .synthesize()方法.
The api responses are from ibm-watson's textToSpeech.synthesize() method.
需要多个请求的原因是因为该服务对文本输入设置了5KB
限制.
The reason multiple requests are required is because the service has a 5KB
limit on text input.
因此,例如,字符串18KB
需要四个请求才能完成.
Therefore a string of 18KB
, for example, requires four requests to complete.
实际行为
可写流文件不完整且出现乱码.
The writeable stream file is incomplete and garbled.
应用程序似乎挂起".
The application seems to 'hang'.
当我尝试在音频播放器中打开不完整的.mp3
文件时,它说文件已损坏.
When I try and open the incomplete .mp3
file in an audio player, it says it is corrupted.
打开和关闭文件的过程似乎增加了文件大小-就像打开文件一样,它提示更多数据流入其中.
The process of opening and closing the file seems to increase its file size - like opening the file somehow prompts more data to flow in to it.
对于较大的输入,例如四个4000字节或更少的字符串,不良行为更加明显.
Undesirable behaviour is more apparent with larger inputs, eg four strings of 4000 bytes or less.
我尝试过的事情
我尝试了几种使用npm包将可读流传输到单个可写流或多个可写流的方法.组合流,组合流2 , 多数据流和
I've tried several methods to pipe the readable streams to either a single writeable stream or multiple writeable streams using the npm packages combined-stream, combined-stream2, multistream and archiver and they all result in incomplete files. My last attempt doesn't use any packages and is shown in the Steps To Reproduce
section below.
因此,我要质疑应用程序逻辑的每个部分:
I am therefore questioning each part of my application logic:
01..Watson文本对语音API请求的响应类型是什么?
01. What is the response type of a watson text to speech api request?
文本到语音文档 ,假设api响应类型为:
The text to speech docs, say the api response type is:
Response type: NodeJS.ReadableStream|FileObject|Buffer
我很困惑,响应类型是三种可能的事情之一.
I am confused that the response type is one of three possible things.
在所有尝试中,我一直假设它是readable stream
.
In all my attempts, I have been assuming it is a readable stream
.
02..我可以在地图函数中发出多个api请求吗?
02. Can I make multiple api requests in a map function?
03..我可以将每个请求包装在promise()
中并解析response
吗?
03. Can I wrap each request within a promise()
and resolve the response
?
04..我可以将结果数组分配给promises
变量吗?
04. Can I assign the resulting array to a promises
variable?
05.我可以声明var audio_files = await Promise.all(promises)
吗?
06..声明之后,所有答复都完成"了吗?
06. After this declaration, are all responses 'finished'?
07..如何正确将每个响应传递给可写流?
07. How do I correctly pipe each response to a writable stream?
08..如何检测所有管道何时完成,以便可以将文件发送回客户端?
08. How do I detect when all pipes have finished, so I can send file back to client?
对于问题2-6,我假设答案为是".
For questions 2 - 6, I am assuming the answer is 'YES'.
我认为我的失败与问题7和8有关.
I think my failures relate to question 7 and 8.
复制步骤
您可以使用四个随机生成的文本字符串组成的数组来测试此代码,这些文本字符串的各自字节大小分别为3975
,3863
,3974
和3629
个字节-
You can test this code with an array of four randomly generated text strings with a respective byte size of 3975
, 3863
, 3974
and 3629
bytes - here is a pastebin of that array.
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
官方示例显示:
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
据我所知,这似乎对单个请求有效,但对多个请求却无效.
which seems to work fine for single requests, but not for multiple requests, as far as I can tell.
研究
关于可读和可写流,可读流模式(流动和暂停),数据",结束",排出"和完成"事件,pipe(),fs.createReadStream()和fs. createWriteStream()
几乎所有Node.js应用程序,无论多么简单,都以某种方式使用流...
Almost all Node.js applications, no matter how simple, use streams in some manner...
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
可读流具有两种主要模式,这些模式会影响我们的消费方式...它们可以处于
paused
模式或flowing
模式.默认情况下,所有可读流都以暂停模式开始,但是可以轻松将它们切换到flowing
,并在需要时切换回paused
...只需添加data
事件处理程序,就会将暂停的流切换为flowing
模式并删除data
事件处理程序将流切换回paused
模式.
Readable streams have two main modes that affect the way we can consume them...they can be either in the
paused
mode or in theflowing
mode. All readable streams start in the paused mode by default but they can be easily switched toflowing
and back topaused
when needed...just adding adata
event handler switches a paused stream intoflowing
mode and removing thedata
event handler switches the stream back topaused
mode.
https://www .freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
以下是可与可读写流一起使用的重要事件和功能的列表
Here’s a list of the important events and functions that can be used with readable and writable streams
可读流上最重要的事件是:
The most important events on a readable stream are:
data
事件,每当流将大量数据传递给使用者时都会发出此事件
end
事件,当没有更多数据要从流中使用时发出.
The data
event, which is emitted whenever the stream passes a chunk of data to the consumer
The end
event, which is emitted when there is no more data to be consumed from the stream.
可写流上最重要的事件是:
The most important events on a writable stream are:
drain
事件,这是可写流可以接收更多数据的信号.
finish
事件,当所有数据都已刷新到基础系统时发出.
The drain
event, which is a signal that the writable stream can receive more data.
The finish
event, which is emitted when all data has been flushed to the underlying system.
https://www .freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()
负责侦听fs.createReadStream()
中的数据"和结束"事件.
.pipe()
takes care of listening for 'data' and 'end' events from thefs.createReadStream()
.
https://github.com/substack/stream -handbook#为什么要使用流
.pipe()
只是一个函数,它采用可读的源流src并将输出挂钩到目标可写流dst
.pipe()
is just a function that takes a readable source stream src and hooks the output to a destination writable streamdst
https://github.com/substack/stream-handbook#pipe
pipe()
方法的返回值是目标流
The return value of the
pipe()
method is the destination stream
https://flaviocopes.com/nodejs-streams/#pipe >
https://flaviocopes.com/nodejs-streams/#pipe
默认情况下,在目标
stream.end()当源 Readable
流发出'end'
时,该流不再可用,因此目标不再可写.要禁用此默认行为,可以将end
选项作为false
传递,从而使目标流保持打开状态:
By default, stream.end() is called on the destination
Writable
stream when the sourceReadable
stream emits'end'
, so that the destination is no longer writable. To disable this default behavior, theend
option can be passed asfalse
, causing the destination stream to remain open:
在调用
stream.end()
方法并将所有数据都刷新到基础系统之后,会发出'finish'
事件.
The
'finish'
event is emitted after thestream.end()
method has been called, and all data has been flushed to the underlying system.
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
https://nodejs.org/api/stream.html#stream_event_finish
如果您尝试读取多个文件并将其通过管道传输到可写流,则必须将每个文件通过管道传输到可写流,并在执行时传递
end: false
,因为默认情况下,可读流会结束可写流当没有更多数据要读取时流式传输.这是一个示例:
If you're trying to read multiple files and pipe them to a writable stream, you have to pipe each one to the writable stream and and pass
end: false
when doing it, because by default, a readable stream ends the writable stream when there's no more data to be read. Here's an example:
var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
https://stackoverflow.com/a/30916248
您要将第二个阅读添加到事件监听器中,以使第一个阅读完成...
You want to add the second read into an eventlistener for the first read to finish...
var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}
https://stackoverflow.com/a/28033554
节点流简史-部分一个和
A Brief History of Node Streams - part one and two.
相关的Google搜索:
Related Google search:
如何将多个可读流传递到单个可写流? nodejs
how to pipe multiple readable streams to a single writable stream? nodejs
涵盖相同或相似主题的问题,没有权威性答案(或者可能过时"):
Questions covering the same or similar topic, without authoritative answers (or might be 'outdated'):
如何将多个ReadableStreams传递到单个WriteStream?/a>
How to pipe multiple ReadableStreams to a single WriteStream?
推荐答案
此处要解决的核心问题是异步性.您几乎拥有了它:发布代码的问题在于,您正在并行地并行处理所有源流.无序进入目标流.这意味着data
块将随机地从不同的音频流中流出-即使您的end
事件也将超越pipe
,而没有end
太早关闭目标流,这可能解释了为什么重新打开目标流后它会增加
The core problem to solve here is asynchronicity. You almost had it: the problem with the code you posted is that you are piping all source streams in parallel & unordered into the target stream. This means data
chunks will flow randomly from different audio streams - even your end
event will outrace the pipe
s without end
closing the target stream too early, which might explain why it increases after you re-open it.
您想要的是按顺序输送它们-甚至在引用时发布了解决方案
What you want is to pipe them sequentially - you even posted the solution when you quoted
您要将第二个阅读添加到事件监听器中,以使第一个阅读完成...
You want to add the second read into an eventlistener for the first read to finish...
或作为代码:
a.pipe(c, { end:false });
a.on('end', function() {
b.pipe(c);
}
这会将源流按顺序输送到目标流中.
This will pipe the source streams in sequential order into the target stream.
采用您的代码,这意味着将audio_files.forEach
循环替换为:
Taking your code this would mean to replace the audio_files.forEach
loop with:
await Bluebird.mapSeries(audio_files, async (audio, index) => {
const isLastIndex = index == audio_files_length - 1;
audio.pipe(write_stream, { end: isLastIndex });
return new Promise(resolve => audio.on('end', resolve));
});
在此处注意 bluebird.js mapSeries 的用法.
有关您的代码的其他建议:
Further advice regarding your code:
- 您应该考虑使用 lodash.js
- 您应该使用
const
&let
代替var
并考虑使用camelCase
- 当您注意到它只能在一个事件中起作用,而在多个事件中却失败"时,总会想到:异步性,排列,竞争条件.
- you should consider using lodash.js
- you should use
const
&let
instead ofvar
and consider usingcamelCase
- when you notice "it works with one event, but fails with multiple" always think: asynchronicity, permutations, race conditions.
进一步阅读,组合本机节点流的限制: https://github.com/nodejs/node/issues/93
Further reading, limitations of combining native node streams: https://github.com/nodejs/node/issues/93
这篇关于如何通过管道将多个api请求中的多个可读流传输到单个可写流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!