实现缓冲转换流 [英] Implementing a buffered transform stream

查看:14
本文介绍了实现缓冲转换流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 新的 Node.js 流 API 实现流将缓冲一定数量的数据.当这个流通过管道传输到另一个流时,或者如果某些东西消耗了 可读 事件,这个流应该刷新它的缓冲区,然后简单地成为传递.问题是,此流将通过管道传输到许多其他流,并且当附加每个目标流时,必须刷新缓冲区即使它已经刷新到另一个流.

I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.

例如:

  1. BufferStream 实现了stream.Transform,并保留了一个512KB的内部环形缓冲区
  2. ReadableStreamA 被传送到 BufferStream
  3. 的一个实例
  4. BufferStream 写入其环形缓冲区,从 ReadableStreamA 中读取数据. (数据丢失无关紧要,因为缓冲区会覆盖旧数据.)
  5. BufferStream 被传送到 WritableStreamB
  6. WritableStreamB 接收整个 512KB 缓冲区,并继续获取从 ReadableStreamABufferStream 写入的数据.
  7. BufferStream 被传送到 WritableStreamC
  8. WritableStreamC 也接收整个 512KB 的缓冲区,但是这个缓冲区现在与 WritableStreamB 接收的不同,因为更多的数据已经写入 BufferStream.
  1. BufferStream implements stream.Transform, and keeps a 512KB internal ring buffer
  2. ReadableStreamA is piped to an instance of BufferStream
  3. BufferStream writes to its ring buffer, reading data from ReadableStreamA as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)
  4. BufferStream is piped to WritableStreamB
  5. WritableStreamB receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA through BufferStream.
  6. BufferStream is piped to WritableStreamC
  7. WritableStreamC also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB received, because more data has since been written to BufferStream.

这可以通过流 API 实现吗?我能想到的唯一方法是使用一种方法创建一个对象,该方法为每个目的地启动一个新的 PassThrough 流,这意味着我不能简单地通过管道传入和传出它.

Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.

就其价值而言,我已经使用旧的流动"API 通过简单地侦听 data 事件上的新处理程序来完成此操作.当使用 .on('data') 附加一个新函数时,我会直接使用环形缓冲区的副本调用它.

For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data events. When a new function was attached with .on('data'), I would call it directly with a copy of the ring buffer.

推荐答案

这是我对您的问题的看法.

Here's my take on your issue.

基本思想是创建一个 Transform 流,这将允许我们在将数据发送到流的输出之前执行您的自定义缓冲逻辑:

The basic idea is to create a Transform stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

然后,我们需要重写 .pipe() 方法,以便我们在 BufferStream 被管道传输到流时收到通知,这允许我们自动编写数据给它:

Then, we need to override the .pipe() method so that we are are notified when the BufferStream is piped into a stream, which allows us to automatically write data to it:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

这样,当我们写buffer.pipe(someStream)时,我们按预期执行管道并将内部缓冲区写入输出流.之后,Transform 类会处理一切,同时跟踪背压等.

In this way, when we write buffer.pipe(someStream), we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform class takes care of everything, while keeping track of the backpressure and whatnot.

这是一个工作要点.请注意,我没有费心编写正确的缓冲逻辑(即,我不关心内部缓冲区的大小),但这应该很容易修复.

Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.

这篇关于实现缓冲转换流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆