将缓冲区分解为rxjs大小 [英] break up buffer into size rxjs
问题描述
我每次都有一个可观察到的从流中获取数据,大小分别为512,我必须将其分解为其他可观察到的200个字符,并将[12] char保留在其他缓冲区中以与下一个块连接,我通过使用新主题和for循环,我相信可能会有更好,更漂亮的解决方案.
I have an observable get data from stream each time at size 512 each next I have to break it up to 200 char at other observable and keep [12] char in other buffer to concatenate with next block, I solve it by using new subject and for loop, I believe there maybe a better, more pretty solution.
可观察到的----------------------------------------
received Observable ----------------------------------------
- 下一个第一[512] -------> [112] [200] [200] -------> [200] [200]
- 第二个下一个[512] [ 112 ]-> [24] [200] [200] [88+ 112 ]-> [200] [200 ]
- 下一个第三[512] [24]-> [136] [200] [76 + 124] .....
-
第n次迭代[512] [194]-> [106] [200] [200] [106 + 94]-> [200] [200] [200]
- 1st next [512] -------> [112] [200] [200] -------> [200] [200]
- 2nd next [512][112] --> [24][200][200] [88+112] --> [200] [200]
- 3rd next [512][24] --> [136] [200] [76+124] .....
nth iteration [512][194] --> [106][200][200][106+94] --> [200][200][200]
第n + 1个[512] [6] .......
n+1th [512][6].......
maxValueSize = 200
this._sreamRecord$.subscribe(
{
next: (val) => {
const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
if (bufferToSend.length - i > maxValueSize) {
bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
} else {
completationBuffer = bufferToSend.slice(i, i + maxValueSize)
}
}
},
complete() {
if (completationBuffer.length) {
bufferStreamer.next(completationBuffer)
}
bufferStreamer.complete()
}
})
推荐答案
您可能需要考虑以下方法
You may want to consider a solution along these lines
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
let f = splitInChunksWithRemainder([]);
this._sreamRecord$.pipe(
switchMap(s => {
const res = f(s);
f = splitInChunksWithRemainder(res.newRemainder);
return from(res.chunks);
})
)
.subscribe(console.log);
这个想法是在连接前一个 remainder 后,用lodash
chunk
函数分割每个streamRecord
,即,前一个streamRecord
的分割后作为尾部保留的数组.
The idea is to split each streamRecord
with lodash
chunk
function after having concatenated the previous remainder, i.e. the array left as tail from the split of the previous streamRecord
.
这是通过函数splitInChunksWithRemainder
完成的,该函数是更高级别的函数,即返回函数的函数,在这种情况下,设置了remainder
来自上一个拆分的结果
This is done using the function splitInChunksWithRemainder
, which is an higher level function, i.e. a function which returns a function, in this case after having set the remainder
coming from the previous split.
评论后更新
如果您还需要发出 last newRemainder
,则可以考虑以下更复杂的解决方案
If you need to emit also the last newRemainder
, than you can consider a slightly more complex solution such as the following
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
const pipeableChain = () => (source: Observable<any>) => {
let f = splitInChunksWithRemainder([]);
let lastRemainder: any[];
return source.pipe(
switchMap(s => {
const res = f(s);
lastRemainder = res.newRemainder;
f = splitInChunksWithRemainder(lastRemainder);
return from(res.chunks);
}),
concat(defer(() => of(lastRemainder)))
)
}
_streamRecord$.pipe(
pipeableChain()
)
.subscribe(console.log);
我们引入了pipeableChain
功能.在此函数中,我们保存通过执行splitInChunksWithRemainder
返回的余数.源Observable完成后,我们将通过concat
运算符添加最后一条通知.
如您所见,我们还必须使用defer
运算符来确保仅在Observer订阅时(即在源Observable完成之后)创建Observable.如果没有defer
,则在最初订阅源Observable时(即,当lastRemainder
仍未定义时)将创建作为参数传递给concat
的Observable.
We have introduced the pipeableChain
function. In this function we save the remainder which is returned by the execution of splitInChunksWithRemainder
. Once the source Observable completes, we add a last notification via the concat
operator.
As you see, we have also to use the defer
operator to make sure we create the Observable only when the Observer subscribes, i.e. after the source Observable completes. Without defer
the Observable passed to concat
as parameter would be created when the source Observable is initially subscribed, i.e. when lastRemainder
is still undefined.
这篇关于将缓冲区分解为rxjs大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!