Node.js:将流内容拆分为n个部分 [英] Node.js: splitting stream content for n-parts

查看:99
本文介绍了Node.js:将流内容拆分为n个部分的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解节点流及其生命周期.因此,我想将流的内容分为n部分.下面的代码仅用于解释我的意图,并表明我已经尝试过一些自己的事情.我省略了一些细节

I'm trying to understand node streams and their life-cycle. So, I want to split the content of a stream for n-parts. The code below is just to explain my intentions and to show that I already try something by myself. I omitted some details

我有一个流,它仅生成一些数据(只是一个数字序列):

I have a stream which just generates some data(just a sequence of numbers):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();

试图获取n个下一个区块的函数:

a function which tries to take n next chunks:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
}

并想像这样使用它:

take(stream, 3)
  .then(res => {
    assert.deepEqual(res, [1, 2, 3])
    return take(stream, 3)
  })
  .then(res => {
    assert.deepEqual(res, [4, 5, 6])
  })

惯用的方法是什么?

推荐答案

使用ReadableStream,您可以使用单个函数来检查当前数据块的元素是否等于预期结果.

Using ReadableStream you could use a single function to check if elements of current chunk of data is equal to expected result.

创建变量CHUNKN,其中CHUNK是要从原始数组中切片或拼接的元素数,N是在pull()致电.

Create variables, CHUNK and N, where CHUNK is the number of elements to slice or splice from original array, N is the variable incremented by CHUNK at each .enqueue() call within pull() call.

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));

使用链接的.then()

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(({value, done}) => {
  if ([1,2,3].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read()
  }
})
.then(({value, done}) => {
  if ([4,5,6].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    // return `result`; `reader.closed` returns a `Promise`
    return reader.closed.then(() => result);
  }
})
.then(readComplete)
.catch(err => console.log(err));

另请参见 Chrome内存问题-File API + AngularJS

这篇关于Node.js:将流内容拆分为n个部分的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆