什么是处理node.js变换流的背压的正确方法? [英] What's the proper way to handle back-pressure in a node.js Transform stream?

查看:113
本文介绍了什么是处理node.js变换流的背压的正确方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我第一次编写node.js服务器端的冒险经历。到目前为止,这只是
的乐趣,但是我很难理解
实现与node.js流相关的正确方法。

These are my first adventures in writing node.js server side. It's been fun so far but I'm having some difficulty understanding the proper way to implement something as it relates to node.js streams.

出于测试和学习目的,我正在处理其
内容为的大型文件zlib压缩。压缩内容是二进制数据,每个
数据包的长度为38个字节。我正在尝试创建一个结果文件
看起来几乎与原始文件相同,只是每1024个38字节的数据包有一个
未压缩的31字节标头。

For test and learning purposes I'm working with large files whose content is zlib compressed. The compressed content is binary data, each packet being 38 bytes in length. I'm trying to create a resulting file that looks almost identical to the original file except that there is an uncompressed 31 byte header for every 1024 38 byte packets.

+----------+----------+----------+----------+
| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |
+----------+----------+----------+----------+



结果文件内容



resulting file content

+----------+--------------------------------+----------+--------------------------------+
| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
+----------+--------------------------------+----------+--------------------------------+

正如您所看到的,这有点像翻译问题。意思是,我是
将一些源流作为输入,然后将
稍微转换为某些输出流。因此,实现
转换流是很自然的。

As you can see, it's somewhat of a translation problem. Meaning, I'm taking some source stream as input and then slightly transforming it into some output stream. Therefore, it felt natural to implement a Transform stream.

该课程只是尝试完成以下任务:

The class simply attempts to accomplish the following:


  1. 将流作为输入

  2. zlib膨胀数据块以计算数据包数量,
    将其中1024个放在一起,zlib放气,
    预先添加标题。

  3. 通过管道通过
    this.push(块)传递新生成的块。

  1. Takes stream as input
  2. zlib inflates the chunks of data to count the number of packets, putting together 1024 of them, zlib deflating, and prepending a header.
  3. Passes the new resulting chunk on through the pipeline via this.push(chunk).

用例如下:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);



问题



假设转换对于这个用例来说是一个不错的选择,我似乎是在
遇到可能的背压问题。我在 _transform 中对 this.push(chunk)
的调用一直返回 false 。为什么会这样,以及
如何处理这些事情?

Question(s)

Assuming Transform is a good choice for this use case, I seem to be running into a possible back-pressure issue. My call to this.push(chunk) within _transform keeps returning false. Why would this be and how to handle such things?

推荐答案

我认为转换适用于此,但我会执行膨胀作为管道中的一个单独步骤。

I think Transform is suitable for this, but I would perform the inflate as a separate step in the pipeline.

这是一个快速且基本未经测试的例子:

Here's a quick and largely untested example:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    this.push(compressed);
    if (done) {
      done();
    }
  }.bind(this));
};

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._buffers.push(chunk);
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
    this._dump(done);
  } else {
    done();
  }
};

// Flush any remaining buffers.
transformer._flush = function() {
  this._dump();
};

// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
  .pipe(zlib.createInflate())
  .pipe(transformer)
  .pipe(fs.createWriteStream('depth_1000000.out'));

这篇关于什么是处理node.js变换流的背压的正确方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆