Rxjs:实现队列 [英] Rxjs: implementing a queue

查看:63
本文介绍了Rxjs:实现队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 promises,实现一个队列来防止多个 HTTP 请求并行运行是很容易的:

With promises, it's really easy to implement a queue to prevent for example multiple HTTP requests from running in parallel:

class Runner {
  private promise;
  constructor(http) {
    this.promise = q.resolve();
  }
  getUrl() {
    return this.promise = this.promise.then(() => http.get('http://someurl'))
  }
}

var runner = new Runner(http);

var lastPromise;
for (var i = 0; i < 10; i++) {
  lastPromise = runner.getUrl();
}

lastPromise.then(() => console.log("job's done!");

我不知道如何在 Rxjs 中做到这一点.如果我尝试与上述类似的操作,当我添加请求时,所有先前的 HTTP 调用都会重复,因为它只是添加到流中并重新运行整个过程.

I can't figure out how to do this in Rxjs tho. If I try something similar to the above, all previous HTTP calls get repeated when I add a request because it just adds to the stream and reruns the whole thing.

我读过一些关于队列调度程序的内容,但它似乎不存在(不再)?

I read something about a queue scheduler, but that doesn't seem to exist (anymore)?

推荐答案

你可以像@cartant 建议的那样使用 concat:

You can use concat like @cartant suggested:

const urlQueue = Observable.fromPromise(http.get('http://someurl'))
  .concat(Observable.fromPromise(http.get('http://someurl')))
  .concat(Observable.fromPromise(http.get('http://someurl')));

但是您需要在订阅并让队列处理它之前构造这样一个流.还;fromPromise 仍然是渴望的,所以当你调用上面的代码时,你的承诺都会直接开始运行.要解决这个问题,您需要使用 Defer():

But you would need to construct such a stream before subscribing and letting the queue handle it. Also; fromPromise is still eager so your promises will all start running directly when you invoke above code. To solve this you would need to use Defer():

const urls = [
  'http://someurl',
  'http://someurl',
  'http://someurl',
];

const queue = urls
  .map(url => Observable.defer(() => http.get(url))
  .reduce((acc, curr) => acc.concat(curr));

此方法使用原生数组 map 将 url 转换为 Observables,然后使用 reduce 将它们全部连接成一个大流.

This approach uses the native array map to convert the urls to Observables and then uses reduce to concat them all together into one big stream.

更好的解决方案是将您的网址放入流中,然后使用 mergeMap 附加了并发:

A better solution would be to get your url's into a stream and then use mergeMap with a concurrency appended to it:

const urls = [
  'http://someurl',
  'http://someurl',
  'http://someurl',
];

const queuedGets = Observable.from(urls)
  .mergeMap(url => http.get(url), null, 1);

这将导致在前一个完成后一个一个地检索网址,但您仍然需要在开始之前准备好所有网址.根据您的用例,这可能就足够了.请注意,并发设置为 1mergeMap 等效于仅使用 concatMap

This will result in the urls being retrieved one by one after the previous one has completed but you still need to have all urls ready before starting. Depending on your usecase this might suffice. Note that a mergeMap with concurrency set to 1 is the equivalent of just using concatMap

难题的最后一部分可能是您需要按照自己的节奏将新网址推送到您的队列中.为此,您需要一个 主题

The last part of the puzzle maybe is that you need to push new urls into your queue in your own tempo. To do so you would need a Subject

一个Subject 就像一个Observable,但可以多播给多个Observer.Subject 就像 EventEmitters:它们维护着一个包含许多监听器的注册表.

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

class HttpGetQueue {
  const queue = new Subject();

  constructor() {
    public results = queue
      .mergeMap(url => http.get(url), null, 1);
  }

  addToQueue(url) {
    queue.next(url);
  }
}

const instance = new HttpGetQueue();
instance.results.subscribe(res => console.log('got res: ' + res);
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');
instance.addToQueue('http://someurl');

这篇关于Rxjs:实现队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆