链式承诺中 RxJS Observable 的执行顺序 [英] order of execution for RxJS Observable from chained promises

查看:40
本文介绍了链式承诺中 RxJS Observable 的执行顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 AnuglarJS 2 应用程序中,我想从链式承诺中创建 Observable.由于承诺提供一次性结果,第一步是使用 Observable.fromPromise() 然后在元可观察对象上使用 mapAll() 来处理 complete每个 fromPromise.我在这里发现这个问题很有用 RxJS: How to have一个观察者处理多个观察者?

In an AnuglarJS 2 application I'd like to make Observable from chained promises. As promises provide one time result the first step was to use Observable.fromPromise() and then mapAll() on meta observable to deal with complete of each fromPromise. I've here found useful this SO question RxJS: How to have one Observer process multiple Observables?

由于上述问题的公认答案解决了简单事件,因此我使用 Observable.fromPromise(someComposedPromise) 代替 Observable.fromEvent(someEvent) 轻松准备了自己的解决方案>.不幸的是,虽然对于简单的单个 Promise 来说一切都很好,但由于 Promise 被解决的顺序,当 Promise 由两个 Promise 组成时就会出现问题.

As the accepted answer from above question addresses simple events, I've easily prepared own solution using Observable.fromPromise(someComposedPromise) in place of Observable.fromEvent(someEvent). Unfortunately while all works fine for simple single promise, the problem arises where promise is composed from two promises because of the the order in which promises are resolved.

为了简单和案例隔离,假设我们有一些现有的外部 DumbCache(我想使用的是 Ionic 2 LocalStorage 其中最简单的变体看起来与此类似):

For the sake of simplicity and case isolation lets assume we have some existing external DumbCache (and whats I'd like to use is Ionic 2 LocalStorage where the most simple variant looks similar to this one):

class DumbCache {
  cache = {};

  get(key) {
    return new Promise((resolve, reject) => {
      var value = this.cache[key];
      resolve(value);
    });
  }

  set(key, value) {
    return new Promise((resolve, reject) => {
      this.cache[key] = value;
      resolve();
    });
  }
}

那么上面描述的方法是:

then the approach described above is:

class CacheValueObservable {
  private cache: DumbCache;

  constructor(private key: string) {
    this.cache = new DumbCache();
  }

  /*
   * meta observer to handle promises from observables with all results and errors
   * thanks to ReplaySubject(1) current value is available immediately after subscribing 
   */
  private _valueSource$$ = new Rx.ReplaySubject(1);
  private _value$ = this._valueSource$$.mergeAll();

  public value$() { return this._value$; }

  public updateValue(value) {
    this._valueSource$$.next(
      Rx.Observable.fromPromise(
        this.cache.set(this.key, value)
        .then(() => this.cache.get(this.key))
      )
    );
  }
}

现在为以下代码:

let cacheValueObservable = new CacheValueObservable("TEST_KEY");
cacheValueObservable.updateValue('VALUE 0');

cacheValueObservable.value$().subscribe(
    val => {
      console.log('val:' + val);
    },
    val => console.log('err', val.stack),
    () => (console.log('complete'))
);

cacheValueObservable.updateValue('VALUE 1');
cacheValueObservable.updateValue('VALUE 2');

console.log('end');

结果是:

starting...
end
val:VALUE 2
val:VALUE 2
val:VALUE 2

虽然显然我想实现

starting...
end
val:VALUE 0
val:VALUE 1
val:VALUE 2

完整示例:http://jsbin.com/wiheki/edit?js,console

推荐答案

在试图表达要详细描述的问题的同时,我仍在调查和更好地理解该问题.主要的一点是第一个承诺 this.cache.set(this.key, value) 实际上可能会立即被解决,而 Observable.fromPromise 不保证所有的顺序正在解决以下链式承诺.

While trying to express the question to be well described I was still investigating and understanding the issue better and better. The main point is that first promise this.cache.set(this.key, value) may be actually immediately resolved while Observable.fromPromise makes no guarantee in which order all the following chained promises are being resolved.

问题是由于每个链的最后一个承诺只有在最后一个链的第一个承诺改变状态后才执行(因此 VALUE 2).

The problem was caused by the fact that the last promise from each chain was executed only after state was altered by first promise of the last chain (thus VALUE 2).

毕竟从代码的角度来看,解决方案看起来很简单,但它并不那么明显,因为它包含两个关键更改:

After all the solution looks pretty simple from code point of view yet it is not that obvious as consist of two key changes:

  • 使用 Observable.defer 而不是 Observable.fromPromise
  • 将初始 Promise 执行推迟到 mergeAll 阶段
  • 使用 mergeAll(1)
  • 限制(或实际上禁用)合并 promise 的并发性
  • defer the initial promise execution until mergeAll phase by using Observable.defer instead of Observable.fromPromise
  • limit (or actually disable) the concurrency of merging the promises by using mergeAll(1)

因此可行的解决方案如下所示:

thus working solution looks like this:

class CacheValueObservable {
  private cache: DumbCache;

  constructor(private key: string) {
    this.cache = new DumbCache();
  }

  /*
   * meta observer to handle promises from observables with all results and errors
   * thanks to ReplaySubject(1) current value is available immediately after subscribing 
   */
  private _valueSource$$ = new Rx.ReplaySubject(1);
  // disable merge concurrency 
  private _value$ = this._valueSource$$.mergeAll(1);

  public value$() { return this._value$; }

  public updateValue(value) {
    this._valueSource$$.next(
      // defer promise resolution to ensure they will be fully resolved
      // one by one thanks to no concurrency in mergeAll
      Rx.Observable.defer(() =>
        this.cache.set(this.key, value)
        .then(() => this.cache.get(this.key))
      )
    );
  }
}

这是现场示例:http://jsbin.com/bikawo/edit?html,js,console

这篇关于链式承诺中 RxJS Observable 的执行顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆