实现缓冲的转换流 [英] Implementing a buffered transform stream

查看:92
本文介绍了实现缓冲的转换流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用新的Node.js流API 实现一个流将缓冲一定数量的数据。当此流通过管道传输到另一个流时,或者某些内容消耗可读事件时,此流应刷新其缓冲区,然后简单地成为传递。问题是,此流将通过管道传输到许多其他流,并且当连接每个目标流时,即使已将其刷新到另一个流,也必须刷新缓冲区

I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.

例如:


  1. BufferStream implements stream.Transform ,并保留512KB内部环形缓冲区

  2. ReadableStreamA 是管道输送到 BufferStream的实例

  3. BufferStream 写入其环形缓冲区,读取来自 ReadableStreamA 的数据。(数据丢失并不重要,因为缓冲区会覆盖旧数据。)

  4. BufferStream 通过管道传输到 WritableStreamB

  5. WritableStreamB 接收整个512KB缓冲区,并继续从 ReadableStreamA 通过 BufferStream

  6. BufferStream 传送到 WritableSt reamC

  7. WritableStreamC 也接收整个512KB缓冲区,但此缓冲区现在不同于<$收到c $ c> WritableStreamB ,因为此后已将更多数据写入 BufferStream

  1. BufferStream implements stream.Transform, and keeps a 512KB internal ring buffer
  2. ReadableStreamA is piped to an instance of BufferStream
  3. BufferStream writes to its ring buffer, reading data from ReadableStreamA as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)
  4. BufferStream is piped to WritableStreamB
  5. WritableStreamB receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA through BufferStream.
  6. BufferStream is piped to WritableStreamC
  7. WritableStreamC also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB received, because more data has since been written to BufferStream.

这是否可以使用流API?我能想到的唯一方法是使用一种方法创建一个对象,该方法为每个目标创建一个新的PassThrough流,这意味着我不能简单地管道输入和输出。

Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.

对于它的价值,我通过简单地在 data 事件上监听新的处理程序,使用旧的流动API完成了这项工作。当一个新函数附加了 .on('data')时,我会用环形缓冲区的副本直接调用它。

For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data events. When a new function was attached with .on('data'), I would call it directly with a copy of the ring buffer.

推荐答案

这是我对你的问题的看法。

Here's my take on your issue.

基本思路是创建转换流,这将允许我们在流输出上发送数据之前执行自定义缓冲逻辑:

The basic idea is to create a Transform stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

然后,我们需要覆盖 .pipe()方法,以便在将 BufferStream 传送到流中时通知我们,这允许我们自动将数据写入其中:

Then, we need to override the .pipe() method so that we are are notified when the BufferStream is piped into a stream, which allows us to automatically write data to it:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

这样,当我们编写 buffer.pipe(someStream)时,我们按预期执行管道并将内部缓冲区写入输出流。之后,转换类会处理所有事情,同时跟踪背压等等。

In this way, when we write buffer.pipe(someStream), we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform class takes care of everything, while keeping track of the backpressure and whatnot.

这是工作要点。请注意,我没有打扰写一个正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易修复。

Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.

这篇关于实现缓冲的转换流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆