如何创建可在单项和批处理模式之间切换的Rx(RxJS)流? [英] How to create an Rx (RxJS) stream that can be switched between single-item and batch-mode?
问题描述
我有一个Rx流,它是来自特定组件的传出更改源。
I have an Rx stream that is an outgoing change feed from a particular component.
有时我希望能够进入批处理模式在流上,以便传递给 onNext 的项目累积并仅在退出批处理模式时传递。
Occasionally I want to be able to enter a batch mode on the stream so that items passed to onNext accumulate and are passed on only when the batch mode is exited.
通常我会通过流传递单个项目:
Normally I'll pass single items through the stream:
stream.onNext(1);
stream.onNext(2);
传递给 onNext的项目之间存在一对一的映射以及订阅中收到的项目,因此前一个代码段会导致两次调用 subscribe ,其值为1和2.
There is a 1-to-1 mapping between the items passed to onNext and the items received in the subscribe, so the previous snippet results in two calls to subscribe with the the values 1 and 2.
我正在寻找的批处理模式可能是这样的:
The batch mode I am looking for might work something like this:
stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();
在这种情况下,我希望 subscribe 只能用单一的方式调用一次连接数组[1,2]。
In this case I want subscribe to only be invoked once with the single concatenated array [1, 2].
重申一下,我有时只需要批处理模式,有时我会通过非连接项目。
Just to reiterate, I need batch mode only sometimes, at other times I pass the non-concatenated items through.
如何使用Rx实现此行为?
How can I achieve this behaviour with Rx?
注意:以前我通过 onNext 虽然这主要是因为在单独模式和批处理模式下类型保持不变。
NOTE: Previously I was passing arrays through onNext although this is mostly so that the type remains the same when in individual mode and when in batch-mode.
例如:
stream.onNext([1, 2, 3]);
stream.onNext([4, 5, 6]);
订阅收到[1,2,3]然后[4,5,6],但在批处理模式订阅接收连接结果[1,2,3,4,5,6]。
Subscribe receives [1, 2, 3] then [4, 5, 6], but when in batch-mode subscribe receives the concatenated results [1, 2, 3, 4, 5, 6].
因此,当你以这种方式看待它时,它更像是非连接的和连接的模式(而不是个人和批处理模式)。但我认为问题非常相似。
So when you look at it this way it's more like unconcatenated and concatenated modes (as opposed to individual and batch-mode). But I think the problem is very similar.
推荐答案
以下是James World的c#解决方案转换为JavaScript并为您量身定制的精神特定问题。
Here is the spirit of James World's c# solution converted to JavaScript and tailored to your specific question.
var source = new Rx.Subject();
source.batchMode = new Rx.BehaviorSubject(false);
source.enterBatchMode = function() { this.batchMode.onNext(true); };
source.exitBatchMode = function() { this.batchMode.onNext(false); };
var stream = source
.window(source.batchMode
.distinctUntilChanged()
.skipWhile(function(mode) { return !mode; }))
.map(function(window, i) {
if ((i % 2) === 0) {
// even windows are unbatched windows
return window;
}
// odd windows are batched
// collect the entries in an array
// (e.g. an array of arrays)
// then use Array.concat() to join
// them together
return window
.toArray()
.filter(function(array) { return array.length > 0; })
.map(function(array) {
return Array.prototype.concat.apply([], array);
});
})
.concatAll();
stream.subscribe(function(value) {
console.log("received ", JSON.stringify(value));
});
source.onNext([1, 2, 3]);
source.enterBatchMode();
source.onNext([4]);
source.onNext([5, 6]);
source.onNext([7, 8, 9]);
source.exitBatchMode();
source.onNext([10, 11]);
source.enterBatchMode();
source.exitBatchMode();
source.onNext([12]);
source.enterBatchMode();
source.onNext([13]);
source.onNext([14, 15]);
source.exitBatchMode();
<script src="https://getfirebug.com/firebug-lite-debug.js"></script>
<script src="https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js"></script>
这篇关于如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!