如何将Promises序列转换为Rx.RxJS可观察到? [英] How to convert a sequence of Promises into Rx.Observable with RxJS?
问题描述
我正在尝试使用RxJS从Promises链中创建Rx.Observable
.与此问题的区别在于,我的Promises数量未知,每个Promise都取决于上一个的结果.
I'm trying to create an Rx.Observable
from a chain of Promises with RxJS. The difference from this question is that I have unknown number of Promises, and every Promise depends on the result of the previous one.
基本上,我有一系列页面,这些页面与下一页"链接相连.
Basically I have a sequence of pages, connected with "next page" links.
我希望函数执行的操作是:
What I want the function to do is:
- 等待承诺<>
- 提供结果(fire Observer.onNext())
- 检查是否存在下一页链接
- 使用该链接创建下一个Promise<>
- 重复直到剩余页面
我尝试了以下操作:
private getPages<T>(firstPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
let observable = Rx.Observable.create<T>(async obs => {
let page = await firstPromise;
page.value.forEach(v => obs.onNext(v));
while (page['@odata.nextLink']) {
let nextPageUrl = <string>page['@odata.nextLink'];
let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
page = await nextPagePromise;
page.value.forEach(v => obs.onNext(v));
}
obs.onCompleted();
});
return observable;
}
(IODataCollectionResult
是OData结果,其中'@ odata.nextLink'是下一页的URL,.value是值的数组)
(IODataCollectionResult
is a OData result, where '@odata.nextLink' is the next page url and .value is an array of values)
问题是,我无法使用TypeScript进行编译,这给了我一个错误:
The problem is I can't compile that with TypeScript, it gives me an error:
类型'(obs:Observer)=> Promise'的参数不能分配给类型'(observer:Observer)=> void |功能介绍IDisposable".
Argument of type '(obs: Observer) => Promise' is not assignable to parameter of type '(observer: Observer) => void | Function | IDisposable'.
这很有意义,因为异步函数返回的是Promise<void>
而不是void
.
Which makes sense, because async function returns a Promise<void>
, not a void
.
这是否意味着我不能对Rx.Observable.create()使用async/await?如何将一系列Promises链接到Observable中?
Does it mean I cannot use async/await with the Rx.Observable.create()? How can I chain a sequence of Promises into an Observable?
推荐答案
使用.then()+递归解决了问题,而无需进行异步/等待:
The problem was solved using .then() + recursion, without async/await:
private getPages<T>(initialPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
return Rx.Observable.create<T>(obs => {
const getPage = (promise: PromiseLike<IODataCollectionResult<T>>) => {
promise.then(page => {
page.value.forEach(v => obs.onNext(v));
if (page['@odata.nextLink']) {
let nextPageUrl = <string>page['@odata.nextLink'];
let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
getPage(nextPagePromise);
}
else {
obs.onCompleted();
}
});
}
getPage(initialPromise);
});
}
这篇关于如何将Promises序列转换为Rx.RxJS可观察到?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!