RxJS:使用 switchMap 产生 N 个值的背压 [英] RxJS: Backpressure with switchMap producing N values

查看:49
本文介绍了RxJS:使用 switchMap 产生 N 个值的背压的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在与一些涉及分页的 RESTful 搜索端点交谈.查询是由用户在搜索字段中输入触发的,因此它会生成一个具有 N 个值的 Observable,对应于 N 个结果页面.

I am talking to some RESTful search endpoint that involves pagination. The query is triggered by user typing in a search field, and as a result it produces an Observable with N values, corresponding to the N result pages.

代码如下所示:

function runQueries(queryObservable) {
  return queryObservable
           .debounceTime(500)
           .distinctUntilChanged()
           .switchMap(search);
}

function search(query) {
  return Observable.create(observer => searchInto(observer, query, 0));
}

function searchInto(observer, query, start) {
  runQuery(query, start).subscribe(result => {
    observer.next(result);
    if (hasMorePages(result)) {
      searchInto(observer, query, start + 1);
    } else {
      observer.complete();
    }
  });
}

现在,搜索请求可能需要一段时间,如果用户更改查询,我不想检索所有页面.

Now, the search requests can take a while and I don't want to retrieve all the pages if user changes the query.

假设搜索返回 3 页,用户在加载一页后更改查询.我想看到类似的东西:

Let's say the search return 3 pages, and user changes the query after one page is loaded. I want to see something like:

USER: types query A
CODE: loads page A1
USER: types query B
CODE: loads page B1
CODE: loads page B2
CODE: loads page B3

switchMap 完成了一半的工作.生成的 observable 具有正确的序列:A1、B1、B2、B3.太好了.

switchMap gets half of the job done. The resulting observable has the correct sequence: A1, B1, B2, B3. Great.

然而,在幕后,我的递归搜索仍在运行所有查询,给服务器、网络等带来不必要的负载.switchMap 确实丢弃了过时"的结果,但它并没有阻止递归函数从工作到结束.换句话说,它看起来像:

However, behind the scenes my recursive search is still running all the queries, putting unnecessary load on the server, network, etc. switchMap does discard the "obsolete" results, but it does not prevent the recursive function from doing its work to the end. In other words, it looks like:

USER: types query A
CODE: loads page A1   -> returned by search observable
USER: types query B
CODE: loads page A2   -> discarded by search observable
CODE: loads page B1   -> returned by search observable
CODE: loads page B2   -> returned by search observable
CODE: loads page A3   -> discarded by search observable
CODE: loads page B3   -> returned by search observable

A"和B"的顺序是随机的(受竞争条件的影响),但没关系.

The sequence of "A" and "B" is random (subject to race conditions), but it doesn't matter.

我做错了什么?对此的惯用解决方案是什么?

What am I doing wrong? What is the idiomatic solution to this?

推荐答案

switchMap 只能在返回可取消的 Observable 时取消操作.由于您没有在 Observable.create 中返回 Subscription,因此无法取消进行中的操作.

switchMap can only cancel an operation if you return an Observable that is cancelable. Since you are not returning the Subscription within Observable.create it can't cancel the in-flight operations.

在这种情况下,使用 Observable.create 并不能真正帮助您,我建议您使用 expand 运算符来执行递归操作:

As it stands using Observable.create is not really helping you in this case, I would suggest that you make use of the expand operator to perform recursive operations instead:

function runQueries(queryObservable) {
  return queryObservable
           .debounceTime(500)
           .distinctUntilChanged()
           .switchMap(search);
}

function search(query) {
  //Kicks off the first query
  return runQuery(query, 0)
      //Uses the results of the first query to see if more queries should be made
     .expand((result, idx) => 
       //Continues to execute more queries until `hasMorePages` is false
       hasMorePages(result) ? 
         runQuery(query, idx + 1) : 
         Observable.empty());
}

这篇关于RxJS:使用 switchMap 产生 N 个值的背压的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆