为最大请求/秒的批处理选择适当的异步方法 [英] Choose proper async method for batch processing for max requests/sec

查看:26
本文介绍了为最大请求/秒的批处理选择适当的异步方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要对某些外部 API 执行一些延迟的循环调用,以防止超出用户速率限制"限制.

I need to perform a cyclic call to some external API with some delay, to prevent from 'User Rate Limit Exceeded' restriction.

Google Maps Geocoding API 对req/sec"敏感,允许 10 req/sec.我应该为我的数百个联系人进行地理编码,并且需要这样的延迟.所以,我需要有 10 个异步地理编码功能,每个功能在 1 秒后延迟.因此,我收集数组中的所有联系人,然后以异步方式循环遍历数组.

Google Maps Geocoding API is sensitive to 'req/sec', allowing 10 req/sec. I should make geocoding for hundreds of my contacts, and such delay is required. So, I need have a 10 async geocoding functions with post-delay in 1 sec for each. So, I collect all contacts in array, and then I loop through array in async manner.

一般来说,我需要有 N 个并发线程,每个线程结束时延迟 D 毫秒.整个循环遍历用户实体数组.像往常一样,每个线程处理单个实体.

Generally, I need to have a N simultaneous threads, with a delay in D msecs in the end of each thread. Entire loop iterates over an array of User entities. Each thread process single entity, as usual.

我想有这样的代码:

const N = 10;   # threads count
const D = 1000; # delay after each execution

var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }

 var async = require('async') ;
 var people = new Array(900);

 async.batchMethod(people, processUser, N, finalCallback);

<小时>

在这个伪代码中,batchMethod 是我要求的方法.

推荐答案

延迟结果并不是你真正想要的.相反,您希望跟踪已发送的内容和发送时间,以便一旦您低于每秒请求数边界,您就可以发送另一个请求.

Putting a delay on the results is not really what you want. Instead, you want to keep track of what you've sent and when you sent it so as soon as you fall under the requests per second boundary, you can send another request.

以下是一个函数的一般概念,该函数将控制速率限制为每秒固定的请求数.这使用了 Promise,并要求您提供一个返回 Promise 的请求函数(如果您现在不使用 Promise,您只需将您的请求函数包装在 Promise 中).

Here's a general concept for a function that will control rate limiting for you to a fixed number of requests per second. This uses promises and requires that you supply a request function that returns a promise (if you aren't using promises now, you just need to wrap your request function in a promise).

// pass the following arguments:
//   array - array of values to iterate
//   requestsPerSec - max requests per second to send (integer)
//   maxInFlight - max number of requests in process at a time
//   fn - function to process an array value
//        function is passed array element as first argument
//        function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
//          or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
    return new Promise(function(resolve, reject) {
        var index = 0;
        var inFlightCntr = 0;
        var doneCntr = 0;
        var launchTimes = [];
        var results = new Array(array.length);

        // calculate num requests in last second
        function calcRequestsInLastSecond() {
            var now = Date.now();
            // look backwards in launchTimes to see how many were launched within the last second
            var cnt = 0;
            for (var i = launchTimes.length - 1; i >= 0; i--) {
                if (now - launchTimes[i] < 1000) {
                    ++cnt;
                } else {
                    break;
                }
            }
            return cnt;            
        }

        function runMore() {
            while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                (function(i) {
                    ++inFlightCntr;
                    launchTimes.push(Date.now());
                    fn(array[i]).then(function(val) {
                        results[i] = val;
                        --inFlightCntr;
                        ++doneCntr;
                        runMore();
                    }, reject);
                })(index);
                ++index;
            }
            // see if we're done
            if (doneCntr === array.length) {
                resolve(results);
            } else if (launchTimes.length >= requestsPerSec) {
                // calc how long we have to wait before sending more
                var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                if (delta >= 0) {
                    setTimeout(runMore, ++delta);
                }

            }
        }
        runMore();
    });
}

示例用法:

rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
    // process array of results here
}, function(err) {
    // process error here
});

此函数的更高级版本称为 rateMap()在 Github 上.

A more advanced version of this function called rateMap() is here on Github.

这段代码背后的总体思路是这样的:

The general idea behind this code is this:

  1. 你传入一个数组进行迭代
  2. 它返回一个 promise,它的解析值是一个结果数组(按顺序)
  3. 您传递的 requestsPerSec 的最大数量一直命中
  4. 您同时传递的请求数达到上限
  5. 您传递一个函数,该函数将从正在迭代的数组中传递一个元素,并且必须返回一个承诺
  6. 它保存上次发送请求时的时间戳数组.
  7. 要查看是否可以发送另一个请求,它会在数组中向后查找并计算最后一秒发送了多少请求.
  8. 如果该数字低于阈值,则发送另一个.
  9. 如果该数字达到阈值,则它会计算您必须等待多长时间才能发送另一个并为该时间设置一个计时器.
  10. 完成每个请求后,它会检查是否可以发送更多请求
  11. 如果任何请求拒绝其承诺,则返回的承诺立即拒绝.如果您不希望它在第一个错误时停止,则将传入的函数修改为不拒绝,而是使用某个值进行解析,以便稍后在处理结果时将其识别为失败的请求.

这是一个工作模拟:https://jsfiddle.net/jfriend00/3gr0tq7k/

注意:如果你传入的 maxInFlight 值高于 requestsPerSec 值,那么这个函数基本上只会发送 requestsPerSec 请求,然后一秒后发送另一个requestsPerSec 请求,因为这是保持在 requestsPerSec 边界下的最快方式.如果 maxInFlight 值等于或低于 requestsPerSec 则它会发送 requestsPerSec 然后当每个请求完成时,它会查看是否可以再发一个.

Note: If the maxInFlight value you pass in is higher than the requestsPerSec value, then this function will basically just send requestsPerSec requests and then one second later, send another requestsPerSec requests since that's the quickest way to stay under the requestsPerSec boundary. If the maxInFlight value is the same or lower than requestsPerSec then it will send requestsPerSec and then as each request finishes, it will see if it can send another one.

这篇关于为最大请求/秒的批处理选择适当的异步方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆