如何在 node.js 可读流中调用异步函数 [英] How to call an asynchronous function inside a node.js readable stream

查看:29
本文介绍了如何在 node.js 可读流中调用异步函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一个自定义可读流的实现的简短示例.该类称为 MyStream.流从目录中获取文件/文件夹名并将值推送到数据事件.

This is a short example of the implementation of a custom readable stream. The class is called MyStream. The stream gets the file/foldernames out of a directory and pushes the values to the data-event.

为了比较我实现的(在本例中)两种不同的方式/功能.一个是同步的,另一个是异步的.构造函数的第二个参数让你决定使用哪种方式(异步为真,同步为假.

To compare I implemented (in this example) two different ways/functions. One is syncronous and the other is asynchronous. The second argument of the constructor lets you decide, which way is used (true for the asynchronous and false for synchronous.

readcounter统计方法_read被调用的次数.只是为了提供反馈.

The readcounter counts the number of times the method _read is called. Just to give a feedback.

var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);

function MyStream(dirpath, async, opt) {
  Readable.call(this, opt);
  this.async = async;
  this.dirpath = dirpath;
  this.counter = 0;
  this.readcounter = 0;
}

MyStream.prototype._read = function() {
  this.readcounter++;
  if (this.async === true){
    console.log("Readcounter: " + this.readcounter);
    that = this;
    fs.readdir(this.dirpath,function(err, files){
      that.counter ++;
      console.log("Counter: " + that.counter);
      for (var i = 0; i < files.length; i++){
        that.push(files[i]);
      }
      that.push(null);
    });
  } else {
    console.log("Readcounter: " + this.readcounter);
    files = fs.readdirSync(this.dirpath)
    for (var i = 0; i < files.length; i++){
      this.push(files[i]);
    };
    this.push(null);
  }
};
//Instance for a asynchronous call
mystream = new MyStream('C:\Users', true);
mystream.on('data', function(chunk){
  console.log(chunk.toString());
});

同步方式按预期工作,但是当我异步调用它时,发生了一些有趣的事情.每次通过 that.push(files[i]) 推送文件名时,都会再次调用 _read 方法.当第一个异步循环完成并且 that.push(null) 定义流的结尾时,这会导致错误.

The synchronous way works like expected, but something interesting is happening, when I call it asynchronously. Everytime the filename is pushed via that.push(files[i]) the _read method is called again. Which causes errors, when the first asynchronous loop is finished and that.push(null) defines the end of the stream.

我用来测试的环境:node 4.1.1,Electron 0.35.2.

The enviroment I am using to test this: node 4.1.1, Electron 0.35.2.

我不明白为什么 _read 如此频繁地被调用以及为什么会发生这种情况.也许这是一个错误?或者有什么我现在看不到的东西.有没有办法使用异步函数构建可读流?异步推送块会非常酷,因为它是非阻塞流方式.特别是当您有大量数据时.

I do not understand why _read is called so ofthen and why this is happening. Maybe it is a bug? Or is there somthing I do not see at the moment. Is there a way to build a readable stream by using asynchronous functions? To push the chunks asynchronously would be really cool, because it would be the non blocking stream way. Specially when you have bigger amount of data.

推荐答案

_read 在读者"需要数据时被调用,通常在你推送数据之后发生.

_read is called whenever the "reader" needs data and it usually happens just after you push data.

我在直接实现 _read 时遇到了同样的问题",所以现在,我编写了一个返回流对象的函数.它工作得很好,并且无法从我的流中拉出"数据,当我决定数据时,数据是可用的/推送的.用你的例子,我会这样做:

I had the same sort of "issues" with implementing _read directly so now, I write a function returning a stream object. It works quite good and data can't be "pulled" from my stream, data is avalaible/pushed when I decide it. With your example, I would do it like this:

var Readable = require('stream').Readable;
var fs = require('fs');

function MyStream(dirpath, async, opt) {
  var rs = new Readable();
  // needed to avoid "Not implemented" exception
  rs._read = function() { 
    // console.log('give me data!'); // << this will print after every console.log(folder);
  };

  var counter = 0;
  var readcounter = 0;

  if (async) {
    console.log("Readcounter: " + readcounter);
    fs.readdir(dirpath, function (err, files) {
      counter++;
      console.log("Counter: " + counter);
      for (var i = 0; i < files.length; i++) {
        rs.push(files[i]);
      }
      rs.push(null);
    });
  } else {
    console.log("Readcounter: " + readcounter);
    files = fs.readdirSync(dirpath)
    for (var i = 0; i < files.length; i++) {
      rs.push(files[i]);
    };
    rs.push(null);
  }

  return rs;
}

var mystream = MyStream('C:\Users', true);
mystream.on('data', function (chunk) {
  console.log(chunk.toString());
});

它不会直接回答您的问题,但它是一种获取工作代码的方法.

It doesn't directly answer your question but it's a way to get a working code.

这篇关于如何在 node.js 可读流中调用异步函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆