RxJs - 将 Observable 数组转换为发射值数组 [英] RxJs - transform array of Observables to an array of emitted values

查看:27
本文介绍了RxJs - 将 Observable 数组转换为发射值数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

抱歉标题,我想不出更好的.
我有这段代码,基本上:

Sorry for the title, I couldn't think of a better one.
I've got this piece of code, which basically:

  1. 过滤有效(非空)cron epressions 的数组
  2. 将每个 cron 表达式映射到对服务的调用

<小时>

this.formGroup.valueChanges.pipe(
    op.filter(v => !!v.cronExpressions),
    op.map((v): string[] => v.cronExpressions),
    op.map((v: string[]) => v.map(cron =>
            this.cronService.getReadableForm(cron).pipe(
                op.map(this.toDescription),
                op.map((description): CronExpressionModel => ({ cron, description }))
            )
        )
    ),
    // What now?
).subscribe((cronExpressions: CronExpressionModel[]) => ...) // Expected result

我想在 subscribe() 上获取从所有服务调用返回的 CronExpressionModel 数组.

I'd like to get, on subscribe(), the array of CronExpressionModel returned from all the services calls.

我无法理解这一点.

当前的解决方案,根据 Martin 的回答:

Current solution, as per Martin answer:

filter(v => !!v.cronExpressions),
map(v => v.cronExpressions),
map(cronExprs => cronExprs.map(c => this.invokeCronService(c))),
mergeMap(serviceCalls => forkJoin(serviceCalls).pipe(defaultIfEmpty([])))

推荐答案

要将流转换为数组,可以使用 toArray 运算符.

To transform a stream into an array, you can use toArray operator.

这是一个建议:

this.formGroup.valueChanges.pipe(
    filter(v => !!v.cronExpressions),
    // transform [item1, item2...] into a stream ----item1----item2----> 
    concatMap((v): Observable<string> => from(v.cronExpressions).pipe(
        // for each of the items, make a request and wait for it to respond
        concatMap((cron: string) => this.cronService.getReadableForm(cron)),
        map(this.toDescription),
        map((description): CronExpressionModel => ({ cron, description })),
        // wait for observables to complete. When all the requests are made, 
        // return an array containing all responses
        toArray()
      )
    ).subscribe((cronExpressions: CronExpressions[]) => ...) // Expected result

注意:

您可以使用 mergeMap 而不是 concatMap 来并行化请求.但你需要知道你在做什么;)

You can use mergeMap instead of concatMap to parallelize the requests. But you need to know what you're doing ;)

这篇关于RxJs - 将 Observable 数组转换为发射值数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆