如何使用异步函数使 RxJS 订阅序列化 onNext 调用? [英] How to make an RxJS subscription with an async function serialize the onNext calls?

查看:46
本文介绍了如何使用异步函数使 RxJS 订阅序列化 onNext 调用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是此问题.

所以 RxJS 正在用下一个事件调用我的异步函数,因为我在异步方法中等待后,前一个事件已经完成.

So RxJS is calling my async function with the next event bebore the previous one has completed as soon as I await in my async method.

我需要序列化对此异步函数的调用.

I need to serialize the calls to this async function.

我从这个 answer 中了解到一个类似的问题,我需要将我的异步功能从订阅和使用中移开concatMap.

I have understood from this answer to a similar question that I need to move my async function away from subscribe and use concatMap.

现在我的代码无法编译并出现以下错误:

Right now my code does not compile with the following error:

错误 TS2339:类型Observable"上不存在属性concatMap".

error TS2339: Property 'concatMap' does not exist on type 'Observable'.

我的代码(尝试调整):

My code (trying to adjust):

1/新的订阅代码(不会编译):

1/ the new subscription code (won't compile):

this.emitter = fromEventPattern(this.addHandler, this.removeHandler, (err, char) => [err, char]); <= unchanged
this.rxSubscription = this.rxSubscription = this.emitter.concatMap(value:any => this.handleUpdatedValuesComingFromSensor(value)).subscribe(); <= concatMap does not exist on type Observable<any>

2/异步函数供您参考:

2/ the async function for your information:

       handleUpdatedValuesComingFromSensor = async (arr: any[]): Promise<void> => {
   ...
   await someMethodAsync();
   ...
}

concatMap 应该用于另一种类型的源,但我无法弄清楚.

concatMap should be used on another type of source but I can't figure it out.

提前致谢.

推荐答案

正如 Kos 所说,在 Rxjs v6 中,可管道操作符成为常态,不再使用 . 将所有内容链接在一起.我假设由于您使用的是 fromEventPattern,而不是 Observable.fromEventPattern,因此您使用的是 rxjs v6+,在这种情况下,您需要包装 concatMap()pipe() 中.

As Kos stated, in Rxjs v6, pipeable operators became the norm, moving away from chaining everything together with .. I assume since you're using fromEventPattern, instead of Observable.fromEventPattern that you are using rxjs v6+, in which case, you need to wrap concatMap() inside a pipe().

this.rxSubscription = this.emitter.pipe(concatMap(value:any => this.handleUpdatedValuesComingFromSensor(value))).subscribe()

这篇关于如何使用异步函数使 RxJS 订阅序列化 onNext 调用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆