实现缓冲转换流 [英] Implementing a buffered transform stream
问题描述
我正在尝试使用 新的 Node.js 流 API 实现流将缓冲一定数量的数据.当这个流通过管道传输到另一个流时,或者如果某些东西消耗了 可读
事件,这个流应该刷新它的缓冲区,然后简单地成为传递.问题是,此流将通过管道传输到许多其他流,并且当附加每个目标流时,必须刷新缓冲区即使它已经刷新到另一个流.
I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable
events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.
例如:
BufferStream
实现了stream.Transform
,并保留了一个512KB的内部环形缓冲区ReadableStreamA
被传送到BufferStream
的一个实例BufferStream
写入其环形缓冲区,从ReadableStreamA
中读取数据. (数据丢失无关紧要,因为缓冲区会覆盖旧数据.)BufferStream
被传送到WritableStreamB
WritableStreamB
接收整个 512KB 缓冲区,并继续获取从ReadableStreamA
到BufferStream
写入的数据.BufferStream
被传送到WritableStreamC
WritableStreamC
也接收整个 512KB 的缓冲区,但是这个缓冲区现在与WritableStreamB
接收的不同,因为更多的数据已经写入BufferStream代码>.
BufferStream
implementsstream.Transform
, and keeps a 512KB internal ring bufferReadableStreamA
is piped to an instance ofBufferStream
BufferStream
writes to its ring buffer, reading data fromReadableStreamA
as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)BufferStream
is piped toWritableStreamB
WritableStreamB
receives the entire 512KB buffer, and continues to get data as it is written fromReadableStreamA
throughBufferStream
.BufferStream
is piped toWritableStreamC
WritableStreamC
also receives the entire 512KB buffer, but this buffer is now different than whatWritableStreamB
received, because more data has since been written toBufferStream
.
这可以通过流 API 实现吗?我能想到的唯一方法是使用一种方法创建一个对象,该方法为每个目的地启动一个新的 PassThrough 流,这意味着我不能简单地通过管道传入和传出它.
Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.
就其价值而言,我已经使用旧的流动"API 通过简单地侦听 data
事件上的新处理程序来完成此操作.当使用 .on('data')
附加一个新函数时,我会直接使用环形缓冲区的副本调用它.
For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data
events. When a new function was attached with .on('data')
, I would call it directly with a copy of the ring buffer.
推荐答案
这是我对您的问题的看法.
Here's my take on your issue.
基本思想是创建一个 Transform
流,这将允许我们在将数据发送到流的输出之前执行您的自定义缓冲逻辑:
The basic idea is to create a Transform
stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
然后,我们需要重写 .pipe()
方法,以便我们在 BufferStream
被管道传输到流时收到通知,这允许我们自动编写数据给它:
Then, we need to override the .pipe()
method so that we are are notified when the BufferStream
is piped into a stream, which allows us to automatically write data to it:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
这样,当我们写buffer.pipe(someStream)
时,我们按预期执行管道并将内部缓冲区写入输出流.之后,Transform
类会处理一切,同时跟踪背压等.
In this way, when we write buffer.pipe(someStream)
, we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform
class takes care of everything, while keeping track of the backpressure and whatnot.
这是一个工作要点.请注意,我没有费心编写正确的缓冲逻辑(即,我不关心内部缓冲区的大小),但这应该很容易修复.
Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.
这篇关于实现缓冲转换流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!