从两个管道流创建Node.js流 [英] Creating a Node.js stream from two piped streams

查看:68
本文介绍了从两个管道流创建Node.js流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果可能的话,我想通过管道将两个Node.js流合并为一个.我正在使用转换流.

I'd like to combine two Node.js streams into one by piping them, if possible. I'm using Transform streams.

换句话说,我希望我的图书馆返回myStream供人们使用.例如,他们可以写:

In other words, I'd like my library to return myStream for people to use. For example they could write:

process.stdin.pipe(myStream).pipe(process.stdout);

在内部,我使用的是第三方vendorStream,它可以完成一些工作,并插入到myInternalStream中包含的我自己的逻辑中.因此,以上内容将转换为:

And internally I'm using a third-party vendorStream that does some work, plugged into my own logic contained in myInternalStream. So what's above would translate to:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

我可以做类似的事情吗?我已经尝试过var myStream = vendorStream.pipe(myInternalStream),但是显然不起作用.

Can I do something like that? I've tried var myStream = vendorStream.pipe(myInternalStream) but that obviously doesn't work.

为了与bash作类比,假设我想编写一个程序来检查某个流(tail -n 1 | grep h)的最后一行中是否存在字母h,我可以创建一个shell脚本:

To make an analogy with bash, let's say I want to write a program that checks if the letter h is present in the last line of some stream (tail -n 1 | grep h), I can create a shell script:

# myscript.sh
tail -n 1 | grep h

然后,如果有人这样做:

And then if people do:

$ printf "abc\ndef\nghi" | . myscript.sh

它可以正常工作.

这是我到目前为止所拥有的:

This is what I have so far:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

这有可能吗?让我知道我要做什么不清楚.

Is this at all possible? Let me know if what I'm trying to do isn't clear.

谢谢!

推荐答案

您可以查看要通过管道传输到您的流中的内容,然后unpipe将其通过管道传输到您感兴趣的流中:

You can watch for something to be piped to your stream, and then unpipe it and pipe it to the streams you're interested in:

var PassThrough = require('stream').PassThrough;

var stream3 = new PassThrough();

// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
  source.unpipe(this);
  this.transformStream = source.pipe(stream1).pipe(stream2);
});

// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
  return this.transformStream.pipe(destination, options);
};

stdin.pipe(stream3).pipe(stdout);

您可以将此功能提取到您自己的可构造流类中:

You can extract this functionality into your own constructable stream class:

var util = require('util');
var PassThrough = require('stream').PassThrough;

var StreamCombiner = function() {
  this.streams = Array.prototype.slice.apply(arguments);

  this.on('pipe', function(source) {
    source.unpipe(this);
    for(i in this.streams) {
      source = source.pipe(this.streams[i]);
    }
    this.transformStream = source;
  });
};

util.inherits(StreamCombiner, PassThrough);

StreamCombiner.prototype.pipe = function(dest, options) {
  return this.transformStream.pipe(dest, options);
};

var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);

这篇关于从两个管道流创建Node.js流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆