实现缓冲的转换流 [英] Implementing a buffered transform stream
问题描述
我正在尝试使用新的Node.js流API 实现一个流将缓冲一定数量的数据。当此流通过管道传输到另一个流时,或者某些内容消耗可读
事件时,此流应刷新其缓冲区,然后简单地成为传递。问题是,此流将通过管道传输到许多其他流,并且当连接每个目标流时,即使已将其刷新到另一个流,也必须刷新缓冲区。
I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable
events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.
例如:
-
BufferStream
implementsstream.Transform
,并保留512KB内部环形缓冲区 -
ReadableStreamA
是管道输送到BufferStream的实例
-
BufferStream
写入其环形缓冲区,读取来自ReadableStreamA
的数据。(数据丢失并不重要,因为缓冲区会覆盖旧数据。) -
BufferStream
通过管道传输到WritableStreamB
-
WritableStreamB
接收整个512KB缓冲区,并继续从ReadableStreamA
通过BufferStream $ c $写入数据c>。
-
BufferStream
传送到WritableSt reamC
-
WritableStreamC
也接收整个512KB缓冲区,但此缓冲区现在不同于<$收到c $ c> WritableStreamB ,因为此后已将更多数据写入BufferStream
。
BufferStream
implementsstream.Transform
, and keeps a 512KB internal ring bufferReadableStreamA
is piped to an instance ofBufferStream
BufferStream
writes to its ring buffer, reading data fromReadableStreamA
as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)BufferStream
is piped toWritableStreamB
WritableStreamB
receives the entire 512KB buffer, and continues to get data as it is written fromReadableStreamA
throughBufferStream
.BufferStream
is piped toWritableStreamC
WritableStreamC
also receives the entire 512KB buffer, but this buffer is now different than whatWritableStreamB
received, because more data has since been written toBufferStream
.
这是否可以使用流API?我能想到的唯一方法是使用一种方法创建一个对象,该方法为每个目标创建一个新的PassThrough流,这意味着我不能简单地管道输入和输出。
Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.
对于它的价值,我通过简单地在 data
事件上监听新的处理程序,使用旧的流动API完成了这项工作。当一个新函数附加了 .on('data')
时,我会用环形缓冲区的副本直接调用它。
For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data
events. When a new function was attached with .on('data')
, I would call it directly with a copy of the ring buffer.
推荐答案
这是我对你的问题的看法。
Here's my take on your issue.
基本思路是创建转换
流,这将允许我们在流输出上发送数据之前执行自定义缓冲逻辑:
The basic idea is to create a Transform
stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
然后,我们需要覆盖 .pipe()
方法,以便在将 BufferStream
传送到流中时通知我们,这允许我们自动将数据写入其中:
Then, we need to override the .pipe()
method so that we are are notified when the BufferStream
is piped into a stream, which allows us to automatically write data to it:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
这样,当我们编写 buffer.pipe(someStream)
时,我们按预期执行管道并将内部缓冲区写入输出流。之后,转换
类会处理所有事情,同时跟踪背压等等。
In this way, when we write buffer.pipe(someStream)
, we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform
class takes care of everything, while keeping track of the backpressure and whatnot.
这是工作要点。请注意,我没有打扰写一个正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易修复。
Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.
这篇关于实现缓冲的转换流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!