如何将数据从函数中推送到可读流? [英] How do you push data to a readable stream from within a function?

查看:116
本文介绍了如何将数据从函数中推送到可读流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力实现以下目标:

I'm trying to achieve the following:


  • 功能 getPaths 读取目录路径并将它们推送到可读流中,因为它们找到它们

  • 可读 stream在接收路径时将传入路径传输到 write 流中。

  • Function getPaths reads directory paths and pushes them into readable stream as it finds them
  • The readable stream keeps piping (streaming) incoming paths into the write stream as it receives the paths.
const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read(item) {
    this.push(item)
  }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir.sync(dir, {"max_depth": 0, "track_inodes": true}, (path, stat) => {
    readable.push(path)
    console.log('pushing a path to readable')
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')
readable.push(null)  // indicates the end of the stream



问题



路径没有被压缩并以 getPaths 函数找到它们并将它们推送到流中,直到它找到所有它们才会发生。我知道这可能是因为该过程是同步的,但无法弄清楚如何使其工作。

Problem

The paths are not being compressed and written to the file as the getPaths function finds them and pushes them into the stream, it doesn't happen until it has found all of them. I know it's probably due to the process being synchronous but cannot figure out how to make it work.

我从日志中看到以下输出:

I see the following output from the logs:

> // .gz file gets created with size of 0
> // Nothing happens for about 1 minute
> x(184206803) "pushing a path to readable"
> "getPaths() ran"
> // I see the data started being written into the file
> "Done"



更新:



如果我像这样异步执行此操作(或使用下面答案中的代码):

UPDATE:

And if I do this asynchronously like this (or use the code from the answer below):

let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 

  ...

  // readable.push(null) 

我收到错误(我认为,当你完成数据推送后它没有收到预期的数据块时会抛出那个特定的错误。如果从代码中删除最后一行: readable.push(null),并尝试再次运行代码,它会抛出相同的错误):

I get an error (I think, it throws that particular error when it doesn't receive expected data chunk after you're done pushing data into it. If you remove that last line from the code: readable.push(null), and try to run the code again it throws the same error):

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be one of type
 string or Buffer. Received type number


推荐答案

你的代码非常好,工作正常。你只需要删除 this.push(item)并设置读取函数,空体。

Your code is very good and works fine. You just need to remove this.push(item) and set read function with empty body.

这是一个工作片段

const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read() { }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')

BTW,正确的参数名称是 read(size )。它代表要读取的字节数

BTW, the right argument name is read(size). It represents the number of bytes to read

编辑
无需可读流。您可以直接写入zip。

EDIT There is no need for the readable stream. You can write directly to zip.

const fs = require('fs');
const zlib = require('zlib');
const zip = zlib.createGzip();
const wstream = fs.createWriteStream('C:/test/file.txt.gz');

zip.pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    zip.write(path);
  })
  walker.on('end', (path, stat) => {
    zip.end();
  })
}
getPaths("C:/")
console.log('getPaths() ran')

这篇关于如何将数据从函数中推送到可读流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆