如何发送具有最大并行请求数的1000个XHTTP请求 [英] How to send 1000 XHTTP Requests with maximum number of parallel requests

查看:277
本文介绍了如何发送具有最大并行请求数的1000个XHTTP请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Angular应用程序,它需要发送N个XHTTP请求,其中1< = N< = 10000.

应用程序需要尽快处理它,因此最好同时存在多个活动的XHTTP请求,并同时具有多个请求的滑动窗口.由于服务器端API的限制,无法使用WebSocket或其他类似流的解决方案.

我的第一个想法是使用类似 RxJS forkJoin ,但我很难限制并发请求数.据我所知,最大请求数存在API限制,例如 Chrome只能同时允许8个请求.

大多数解决方案/指南我发现a.)没有限制并发连接的最大数量,或者b.)没有动态更新(超时解决方案对于此任务效率不高).

例如:

const test = () =>
  request(`https://swapi.co/api/people/1/`)
    .pipe(
      delay(1000),
      switchMap(response => from(response.films)),
      concatMap((url: string) => request(url).pipe(delay(1000))),
      scan((acc, res) => [...acc, res.title], []),
      tap(console.log)
    )
    .subscribe()

对我不利,因为限制是通过延迟来实现的,但是我想实现类似基于线程的解决方案:最多有Y个并发连接,并且如果一个完成,则立即开始一个新请求

const test = () =>
  request(`https://swapi.co/api/people/1/`)
    .pipe{
      switchMap(response => from(response.films)),
      specialOperatorIAmLookingFor((url: string) => request(url), 8),   // where '8' is the maximum number of paralell requests
      scan((acc, res) => [...acc, res.title], []),
      tap(console.log)
    )
    .subscribe()

有什么想法可以很好地解决这个问题吗? RxJS认为应该为已经编写的解决方案.

解决方案

您可以尝试使用RxJS concatMap 运算符与 forkJoin() .

来自bufferCount文档:

收集发出的值,直到满足提供的数量为止,发出为 数组.

因此,它收集n个通知,并将其作为数组发出.然后,我们可以将数组通过forkJoin()进行n个并行请求.

尝试以下

我认为this.urls是类似于以下内容的HTTP请求的集合

 urls = [
  this.http.get('url1'),
  this.http.get('url2'),
  this.http.get('url3'),
  ...
];
 

然后,请求触发代码看起来像

 bufferedRequests() {
  from(this.urls).pipe(
    bufferCount(6),      // <-- adjust number of parallel requests here
    concatMap(buffer => forkJoin(buffer))
  ).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('complete')
  );
}
 


根据Chromium的评论工程师,最大的实际解决方案与主机/域限制的连接将使用WebSocket或域分片.但是,由于您提到在您的环境中这是不可能的,因此可以使用缓冲的请求解决方法.

但是,我不会尝试缓冲到最大限制.如果您要向同一域发送的请求数超出了允许的上限,那么您会发现,在上述请求完成之前,其他请求实际上会滞后.假设如果要缓冲到最大允许的限制,并且您的应用程序从应用程序工作流所依赖的其他位置向同一域发送了另一个请求,则整个应用程序可能会受到限制.

因此,最好使用WebSockets或域分片.而且如果不可能,最好将请求缓冲到小于* <所允许的最大限制的多个请求中.

*显然,如果您100%地确定在缓冲过程中不会有其他请求触发到相同的域,则可以将其缓冲到最大.允许的限制.

I have an Angular application, which needs to send N XHTTP requests, where 1 <= N <= 10000.

The application needs to handle it as fast as possible, so preferably there should be multiple active XHTTP requests at the same time, with a slide-window of multiple requests at the same time. Using WebSocket, or other streaming-like solution is not possible due to server-side API limitations.

My first idea was to use something like RxJS forkJoin, but I struggle to limit the concurrent requests number. As far as I know, there are API limitations for max requests number, for instance Chrome will allow only 8 simultaneous requests.

Most of the solutions/tutorials I found either a.) does not limit the maximum number of concurrent connections or b.) does not update dynamically (timeout solutions are not efficient for this task).

For instance:

const test = () =>
  request(`https://swapi.co/api/people/1/`)
    .pipe(
      delay(1000),
      switchMap(response => from(response.films)),
      concatMap((url: string) => request(url).pipe(delay(1000))),
      scan((acc, res) => [...acc, res.title], []),
      tap(console.log)
    )
    .subscribe()

is not good for me, as the limitation is achieved by delay, but I would like to achieve something like a thread based solution: there are maximum of Y number of concurrent connections, and if one finishes, a new request starts immediately.

const test = () =>
  request(`https://swapi.co/api/people/1/`)
    .pipe{
      switchMap(response => from(response.films)),
      specialOperatorIAmLookingFor((url: string) => request(url), 8),   // where '8' is the maximum number of paralell requests
      scan((acc, res) => [...acc, res.title], []),
      tap(console.log)
    )
    .subscribe()

Any ideas how to solve this nicely? RxJS feels like there should be a solution for this already written.

解决方案

You could try to use RxJS bufferCount and concatMap operators along with forkJoin().

From bufferCount docs:

Collect emitted values until provided number is fulfilled, emit as array.

So it collects n number of notifications and emit it as an array. We could then pass the array through forkJoin() for n number of parallel requests.

Try the following

I assume this.urls is a collection of HTTP requests similar to

urls = [
  this.http.get('url1'),
  this.http.get('url2'),
  this.http.get('url3'),
  ...
];

Then the requests triggering code would look like

bufferedRequests() {
  from(this.urls).pipe(
    bufferCount(6),      // <-- adjust number of parallel requests here
    concatMap(buffer => forkJoin(buffer))
  ).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('complete')
  );
}


According to this comment by a Chromium engineer, actual solution to the max. connections to host/domain limit would be to use WebSockets or Domain sharding. But since you mention it isn't possible in your environment, you could use the buffered request workaround.

However I wouldn't try to buffer to the max limit. If you were to send more requests to the same domain than the max allowed, you could see that the additional requests would actually throttle behind till the above requests are finished. So say if were to buffer to the max allowed limit, and your application sends an additional request to the same domain from somewhere else on which the app workflow depends on, the entire app could throttle.

So it's better to either use WebSockets or Domain sharding. And if neither is possible it's better to buffer the requests to a number of requests less than* the max allowed limit.

* Obviously, if you're 100% percent sure no other requests will be triggered to the same domain during the buffering procedure, then you could buffer to max. allowed limit.

这篇关于如何发送具有最大并行请求数的1000个XHTTP请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆