将缓冲区分解为rxjs大小 [英] break up buffer into size rxjs

查看:108
本文介绍了将缓冲区分解为rxjs大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每次都有一个可观察到的从流中获取数据,大小分别为512,我必须将其分解为其他可观察到的200个字符,并将[12] char保留在其他缓冲区中以与下一个块连接,我通过使用新主题和for循环,我相信可能会有更好,更漂亮的解决方案.

I have an observable get data from stream each time at size 512 each next I have to break it up to 200 char at other observable and keep [12] char in other buffer to concatenate with next block, I solve it by using new subject and for loop, I believe there maybe a better, more pretty solution.

可观察到的----------------------------------------

received Observable ----------------------------------------

  • 下一个第一[512] -------> [112] [200] [200] -------> [200] [200]
  • 第二个下一个[512] [ 112 ]-> [24] [200] [200] [88+ 112 ]-> [200] [200 ]
  • 下一个第三[512] [24]-> [136] [200] [76 + 124] .....
  • 第n次迭代[512] [194]-> [106] [200] [200] [106 + 94]-> [200] [200] [200]

  • 1st next [512] -------> [112] [200] [200] -------> [200] [200]
  • 2nd next [512][112] --> [24][200][200] [88+112] --> [200] [200]
  • 3rd next [512][24] --> [136] [200] [76+124] .....
  • nth iteration [512][194] --> [106][200][200][106+94] --> [200][200][200]

第n + 1个[512] [6] .......

n+1th [512][6].......

maxValueSize = 200
this._sreamRecord$.subscribe(
    {
        next: (val) => {
            const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
            for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
                if (bufferToSend.length - i > maxValueSize) {
                    bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
                } else {
                    completationBuffer = bufferToSend.slice(i, i + maxValueSize)
                }
            }
        },
        complete() {
            if (completationBuffer.length) {
                bufferStreamer.next(completationBuffer)
            }
            bufferStreamer.complete()
        }
    })

推荐答案

您可能需要考虑以下方法

You may want to consider a solution along these lines

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

let f = splitInChunksWithRemainder([]);

this._sreamRecord$.pipe(
    switchMap(s => {
        const res = f(s);
        f = splitInChunksWithRemainder(res.newRemainder);
        return from(res.chunks);
    })
)
.subscribe(console.log);

这个想法是在连接前一个 remainder 后,用lodash chunk函数分割每个streamRecord,即,前一个streamRecord的分割后作为尾部保留的数组.

The idea is to split each streamRecord with lodash chunk function after having concatenated the previous remainder, i.e. the array left as tail from the split of the previous streamRecord.

这是通过函数splitInChunksWithRemainder完成的,该函数是更高级别的函数,即返回函数的函数,在这种情况下,设置了remainder来自上一个拆分的结果

This is done using the function splitInChunksWithRemainder, which is an higher level function, i.e. a function which returns a function, in this case after having set the remainder coming from the previous split.

评论后更新

如果您还需要发出 last newRemainder,则可以考虑以下更复杂的解决方案

If you need to emit also the last newRemainder, than you can consider a slightly more complex solution such as the following

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

const pipeableChain = () => (source: Observable<any>) => {
    let f = splitInChunksWithRemainder([]);
    let lastRemainder: any[];
    return source.pipe(
        switchMap(s => {
            const res = f(s);
            lastRemainder = res.newRemainder;
            f = splitInChunksWithRemainder(lastRemainder);
            return from(res.chunks);
        }),
        concat(defer(() => of(lastRemainder)))
    )
}

_streamRecord$.pipe(
    pipeableChain()
)
.subscribe(console.log);

我们引入了pipeableChain功能.在此函数中,我们保存通过执行splitInChunksWithRemainder返回的余数.源Observable完成后,我们将通过concat运算符添加最后一条通知. 如您所见,我们还必须使用defer运算符来确保仅在Observer订阅时(即在源Observable完成之后)创建Observable.如果没有defer,则在最初订阅源Observable时(即,当lastRemainder仍未定义时)将创建作为参数传递给concat的Observable.

We have introduced the pipeableChain function. In this function we save the remainder which is returned by the execution of splitInChunksWithRemainder. Once the source Observable completes, we add a last notification via the concat operator. As you see, we have also to use the defer operator to make sure we create the Observable only when the Observer subscribes, i.e. after the source Observable completes. Without defer the Observable passed to concat as parameter would be created when the source Observable is initially subscribed, i.e. when lastRemainder is still undefined.

这篇关于将缓冲区分解为rxjs大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆