如何创建可在单项和批处理模式之间切换的Rx(RxJS)流? [英] How to create an Rx (RxJS) stream that can be switched between single-item and batch-mode?

查看:83
本文介绍了如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Rx流,它是来自特定组件的传出更改源。

I have an Rx stream that is an outgoing change feed from a particular component.

有时我希望能够进入批处理模式在流上,以便传递给 onNext 的项目累积并仅在退出批处理模式时传递。

Occasionally I want to be able to enter a batch mode on the stream so that items passed to onNext accumulate and are passed on only when the batch mode is exited.

通常我会通过流传递单个项目:

Normally I'll pass single items through the stream:

stream.onNext(1); 
stream.onNext(2);

传递给 onNext的项目之间存在一对一的映射以及订阅中收到的项目,因此前一个代码段会导致两次调用 subscribe ,其值为1和2.

There is a 1-to-1 mapping between the items passed to onNext and the items received in the subscribe, so the previous snippet results in two calls to subscribe with the the values 1 and 2.

我正在寻找的批处理模式可能是这样的:

The batch mode I am looking for might work something like this:

stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();

在这种情况下,我希望 subscribe 只能用单一的方式调用一次连接数组[1,2]。

In this case I want subscribe to only be invoked once with the single concatenated array [1, 2].

重申一下,我有时只需要批处理模式,有时我会通过非连接项目。

Just to reiterate, I need batch mode only sometimes, at other times I pass the non-concatenated items through.

如何使用Rx实现此行为?

How can I achieve this behaviour with Rx?

注意:以前我通过 onNext 虽然这主要是因为在单独模式和批处理模式下类型保持不变。

NOTE: Previously I was passing arrays through onNext although this is mostly so that the type remains the same when in individual mode and when in batch-mode.

例如:

stream.onNext([1, 2, 3]); 
stream.onNext([4, 5, 6]);

订阅收到[1,2,3]然后[4,5,6],但在批处理模式订阅接收连接结果[1,2,3,4,5,6]。

Subscribe receives [1, 2, 3] then [4, 5, 6], but when in batch-mode subscribe receives the concatenated results [1, 2, 3, 4, 5, 6].

因此,当你以这种方式看待它时,它更像是非连接的连接的模式(而不是个人批处理模式)。但我认为问题非常相似。

So when you look at it this way it's more like unconcatenated and concatenated modes (as opposed to individual and batch-mode). But I think the problem is very similar.

推荐答案

以下是James World的c#解决方案转换为JavaScript并为您量身定制的精神特定问题。

Here is the spirit of James World's c# solution converted to JavaScript and tailored to your specific question.

var source = new Rx.Subject();
source.batchMode = new Rx.BehaviorSubject(false);
source.enterBatchMode = function() { this.batchMode.onNext(true); };
source.exitBatchMode = function() { this.batchMode.onNext(false); };
var stream = source
  .window(source.batchMode
    .distinctUntilChanged()
    .skipWhile(function(mode) { return !mode; }))
  .map(function(window, i) {
    if ((i % 2) === 0) {
      // even windows are unbatched windows
      return window;
    }

    // odd windows are batched
    // collect the entries in an array
    // (e.g. an array of arrays)
    // then use Array.concat() to join
    // them together
    return window
      .toArray()
      .filter(function(array) { return array.length > 0; })
      .map(function(array) {
        return Array.prototype.concat.apply([], array);
      });
  })
  .concatAll();


stream.subscribe(function(value) {
  console.log("received ", JSON.stringify(value));
});

source.onNext([1, 2, 3]);

source.enterBatchMode();
source.onNext([4]);
source.onNext([5, 6]);
source.onNext([7, 8, 9]);
source.exitBatchMode();

source.onNext([10, 11]);

source.enterBatchMode();
source.exitBatchMode();

source.onNext([12]);

source.enterBatchMode();
source.onNext([13]);
source.onNext([14, 15]);
source.exitBatchMode();

<script src="https://getfirebug.com/firebug-lite-debug.js"></script>
<script src="https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js"></script>

这篇关于如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆