Node.js:将流内容拆分为n个部分 [英] Node.js: splitting stream content for n-parts
问题描述
我试图了解节点流及其生命周期.因此,我想将流的内容分为n部分.下面的代码仅用于解释我的意图,并表明我已经尝试过一些自己的事情.我省略了一些细节
I'm trying to understand node streams and their life-cycle. So, I want to split the content of a stream for n-parts. The code below is just to explain my intentions and to show that I already try something by myself. I omitted some details
我有一个流,它仅生成一些数据(只是一个数字序列):
I have a stream which just generates some data(just a sequence of numbers):
class Stream extends Readable {
constructor() {
super({objectMode: true, highWaterMark: 1})
this.counter = 0
}
_read(size) {
if(this.counter === 30) {
this.push(null)
} else {
this.push(this.counter)
}
this.counter += 1
}
}
const stream = new Stream()
stream.pause();
试图获取n个下一个区块的函数:
a function which tries to take n next chunks:
function take(stream, count) {
const result = []
return new Promise(function(resolve) {
stream.once('readable', function() {
var chunk;
do {
chunk = stream.read()
if (_.isNull(chunk) || result.length > count) {
stream.pause()
break
}
result.push(chunk)
} while(true)
resolve(result)
})
})
}
并想像这样使用它:
take(stream, 3)
.then(res => {
assert.deepEqual(res, [1, 2, 3])
return take(stream, 3)
})
.then(res => {
assert.deepEqual(res, [4, 5, 6])
})
惯用的方法是什么?
推荐答案
使用ReadableStream
,您可以使用单个函数来检查当前数据块的元素是否等于预期结果.
Using ReadableStream
you could use a single function to check if elements of current chunk of data is equal to expected result.
创建变量CHUNK
和N
,其中CHUNK
是要从原始数组中切片或拼接的元素数,N
是在pull()
致电.
Create variables, CHUNK
and N
, where CHUNK
is the number of elements to slice or splice from original array, N
is the variable incremented by CHUNK
at each .enqueue()
call within pull()
call.
const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];
let N = 0;
const stream = new ReadableStream({
pull(controller) {
if (N < data.length)
// slice `N, N += CHUNK` elements from `data`
controller.enqueue(data.slice(N, N += CHUNK))
else
// if `N` is equal to `data.length` call `.close()` on stream
controller.close()
}
});
const reader = stream.getReader();
const processData = ({value, done}) => {
// if stream is closed return `result`; `reader.closed` returns a `Promise`
if (done) return reader.closed.then(() => result);
if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
return reader.read().then(data => processData(data))
}
}
const readComplete = res => console.log(`result: [${res}]`);
reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));
使用链接的.then()
const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];
let N = 0;
const stream = new ReadableStream({
pull(controller) {
if (N < data.length)
// slice `N, N += CHUNK` elements from `data`
controller.enqueue(data.slice(N, N += CHUNK))
else
// if `N` is equal to `data.length` call `.close()` on stream
controller.close()
}
});
const reader = stream.getReader();
const processData = ({value, done}) => {
// if stream is closed return `result`; `reader.closed` returns a `Promise`
if (done) return reader.closed.then(() => result);
if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
return reader.read().then(data => processData(data))
}
}
const readComplete = res => console.log(`result: [${res}]`);
reader.read()
.then(({value, done}) => {
if ([1,2,3].every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
return reader.read()
}
})
.then(({value, done}) => {
if ([4,5,6].every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
// return `result`; `reader.closed` returns a `Promise`
return reader.closed.then(() => result);
}
})
.then(readComplete)
.catch(err => console.log(err));
另请参见 Chrome内存问题-File API + AngularJS
这篇关于Node.js:将流内容拆分为n个部分的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!