如何将Promises序列转换为Rx.RxJS可观察到? [英] How to convert a sequence of Promises into Rx.Observable with RxJS?

查看:88
本文介绍了如何将Promises序列转换为Rx.RxJS可观察到?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用RxJS从Promises链中创建Rx.Observable.与此问题的区别在于,我的Promises数量未知,每个Promise都取决于上一个的结果.

I'm trying to create an Rx.Observable from a chain of Promises with RxJS. The difference from this question is that I have unknown number of Promises, and every Promise depends on the result of the previous one.

基本上,我有一系列页面,这些页面与下一页"链接相连.

Basically I have a sequence of pages, connected with "next page" links.

我希望函数执行的操作是:

What I want the function to do is:

  • 等待承诺<>
  • 提供结果(fire Observer.onNext())
  • 检查是否存在下一页链接
  • 使用该链接创建下一个Promise<>
  • 重复直到剩余页面

我尝试了以下操作:

private getPages<T>(firstPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {

    let observable = Rx.Observable.create<T>(async obs => {
        let page = await firstPromise;
        page.value.forEach(v => obs.onNext(v));

        while (page['@odata.nextLink']) {
            let nextPageUrl = <string>page['@odata.nextLink'];
            let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
            page = await nextPagePromise;
            page.value.forEach(v => obs.onNext(v));
        }

        obs.onCompleted();
    });

    return observable;
}

(IODataCollectionResult是OData结果,其中'@ odata.nextLink'是下一页的URL,.value是值的数组)

(IODataCollectionResult is a OData result, where '@odata.nextLink' is the next page url and .value is an array of values)

问题是,我无法使用TypeScript进行编译,这给了我一个错误:

The problem is I can't compile that with TypeScript, it gives me an error:

类型'(obs:Observer)=> Promise'的参数不能分配给类型'(observer:Observer)=> void |功能介绍IDisposable".

Argument of type '(obs: Observer) => Promise' is not assignable to parameter of type '(observer: Observer) => void | Function | IDisposable'.

这很有意义,因为异步函数返回的是Promise<void>而不是void.

Which makes sense, because async function returns a Promise<void>, not a void.

这是否意味着我不能对Rx.Observable.create()使用async/await?如何将一系列Promises链接到Observable中?

Does it mean I cannot use async/await with the Rx.Observable.create()? How can I chain a sequence of Promises into an Observable?

推荐答案

使用.then()+递归解决了问题,而无需进行异步/等待:

The problem was solved using .then() + recursion, without async/await:

private getPages<T>(initialPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
    return Rx.Observable.create<T>(obs => {
        const getPage = (promise: PromiseLike<IODataCollectionResult<T>>) => {
            promise.then(page => {
                page.value.forEach(v => obs.onNext(v));
                if (page['@odata.nextLink']) {
                    let nextPageUrl = <string>page['@odata.nextLink'];
                    let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
                    getPage(nextPagePromise);
                }
                else {
                    obs.onCompleted();
                }
            });
        }
        getPage(initialPromise);
    });
}

这篇关于如何将Promises序列转换为Rx.RxJS可观察到?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆