如何调用node.js的可读流内部异步函数 [英] How to call an asynchronous function inside a node.js readable stream

查看:157
本文介绍了如何调用node.js的可读流内部异步函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一个自定义的可读流实现的简单例子。该类称为MyStream。流失控目录的文件/ foldernames和推值到数据事件。

This is a short example of the implementation of a custom readable stream. The class is called MyStream. The stream gets the file/foldernames out of a directory and pushes the values to the data-event.

要比较我实现(在这个例子中)两种不同的方式/功能。一个是syncronous,另一个是异步的。构造函数的第二个参数可以让你决定使用哪种方式(同步异步真假。

To compare I implemented (in this example) two different ways/functions. One is syncronous and the other is asynchronous. The second argument of the constructor lets you decide, which way is used (true for the asynchronous and false for synchronous.

的readcounter计数的次数的方法_read被拨叫的号码。只是给一个反馈。

The readcounter counts the number of times the method _read is called. Just to give a feedback.

var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);

function MyStream(dirpath, async, opt) {
  Readable.call(this, opt);
  this.async = async;
  this.dirpath = dirpath;
  this.counter = 0;
  this.readcounter = 0;
}

MyStream.prototype._read = function() {
  this.readcounter++;
  if (this.async === true){
    console.log("Readcounter: " + this.readcounter);
    that = this;
    fs.readdir(this.dirpath,function(err, files){
      that.counter ++;
      console.log("Counter: " + that.counter);
      for (var i = 0; i < files.length; i++){
        that.push(files[i]);
      }
      that.push(null);
    });
  } else {
    console.log("Readcounter: " + this.readcounter);
    files = fs.readdirSync(this.dirpath)
    for (var i = 0; i < files.length; i++){
      this.push(files[i]);
    };
    this.push(null);
  }
};
//Instance for a asynchronous call
mystream = new MyStream('C:\\Users', true);
mystream.on('data', function(chunk){
  console.log(chunk.toString());
});

在同步方式的工作方式类似预期,但一些有趣的事情发生了,当我异步调用它。每次的文件名是通过推that.push(文件[I])的_read方法再次被调用。这会导致错误,当第一异步循环结束后和 that.push(空)定义了数据流的结束。

The synchronous way works like expected, but something interesting is happening, when I call it asynchronously. Everytime the filename is pushed via that.push(files[i]) the _read method is called again. Which causes errors, when the first asynchronous loop is finished and that.push(null) defines the end of the stream.

我使用来测试这一点的环境:节点4.1.1,电子0.35.2

The enviroment I am using to test this: node 4.1.1, Electron 0.35.2.

我不明白为什么_read被称为所以ofthen和为什么发生这种情况。也许这是一个错误?还是有我的财产以后不要在此刻看到的。
有没有建立使用异步函数读取数据流的方法吗?为了异步推块是真的很酷,因为这将是无阻塞流的方式。特别是当你的数据量更大。

I do not understand why _read is called so ofthen and why this is happening. Maybe it is a bug? Or is there somthing I do not see at the moment. Is there a way to build a readable stream by using asynchronous functions? To push the chunks asynchronously would be really cool, because it would be the non blocking stream way. Specially when you have bigger amount of data.

推荐答案

_read 每当读者需要的数据,它通常发生你推数据只是后调用。

_read is called whenever the "reader" needs data and it usually happens just after you push data.

我有同样类型的问题与贯彻 _read 直接所以现在,我写一个函数返回一个流对象。它的工作原理相当不错,从我的数据流不能被拉,数据avalaible /推时,我决定了。随着你的榜样,我会做这样的:

I had the same sort of "issues" with implementing _read directly so now, I write a function returning a stream object. It works quite good and data can't be "pulled" from my stream, data is avalaible/pushed when I decide it. With your example, I would do it like this:

var Readable = require('stream').Readable;
var fs = require('fs');

function MyStream(dirpath, async, opt) {
  var rs = new Readable();
  // needed to avoid "Not implemented" exception
  rs._read = function() { 
    // console.log('give me data!'); // << this will print after every console.log(folder);
  };

  var counter = 0;
  var readcounter = 0;

  if (async) {
    console.log("Readcounter: " + readcounter);
    fs.readdir(dirpath, function (err, files) {
      counter++;
      console.log("Counter: " + counter);
      for (var i = 0; i < files.length; i++) {
        rs.push(files[i]);
      }
      rs.push(null);
    });
  } else {
    console.log("Readcounter: " + readcounter);
    files = fs.readdirSync(dirpath)
    for (var i = 0; i < files.length; i++) {
      rs.push(files[i]);
    };
    rs.push(null);
  }

  return rs;
}

var mystream = MyStream('C:\\Users', true);
mystream.on('data', function (chunk) {
  console.log(chunk.toString());
});

它不直接回答你的问题,但它是一种方式来获得一个工作code。

It doesn't directly answer your question but it's a way to get a working code.

这篇关于如何调用node.js的可读流内部异步函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆