选择适当的异步方法进行批处理,以获得最大请求数/秒 [英] Choose proper async method for batch processing for max requests/sec

查看:102
本文介绍了选择适当的异步方法进行批处理,以获得最大请求数/秒的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要对某些外部API进行一些延迟的循环调用,以防止出现超出用户速率限制"的限制.

I need to perform a cyclic call to some external API with some delay, to prevent from 'User Rate Limit Exceeded' restriction.

Google Maps Geocoding API对"req/sec"敏感,允许10 req/sec.我应该对数百个联系人进行地理编码,因此需要这样的延迟.因此,我需要有10个异步地理编码功能,每个功能要延迟1秒.因此,我收集数组中的所有联系人,然后以异步方式遍历数组.

Google Maps Geocoding API is sensitive to 'req/sec', allowing 10 req/sec. I should make geocoding for hundreds of my contacts, and such delay is required. So, I need have a 10 async geocoding functions with post-delay in 1 sec for each. So, I collect all contacts in array, and then I loop through array in async manner.

通常,我需要有N个并发线程,每个线程的末尾都有D毫秒的延迟.整个循环遍历一系列User实体.每个线程照常处理单个实体.

Generally, I need to have a N simultaneous threads, with a delay in D msecs in the end of each thread. Entire loop iterates over an array of User entities. Each thread process single entity, as usual.

我想有这样的代码:

const N = 10;   # threads count
const D = 1000; # delay after each execution

var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }

 var async = require('async') ;
 var people = new Array(900);

 async.batchMethod(people, processUser, N, finalCallback);


在此伪代码中,batchMethod是我所要求的一种方法.


In this pseudocode batchMethod is a method that I am asking for.

推荐答案

延迟结果并不是您真正想要的.相反,您想跟踪已发送的内容以及发送的时间,以便一旦落入每秒请求的边界之内,就可以发送另一个请求.

Putting a delay on the results is not really what you want. Instead, you want to keep track of what you've sent and when you sent it so as soon as you fall under the requests per second boundary, you can send another request.

这是该功能的一般概念,它将控制您的速率限制为每秒固定数量的请求.这使用了Promise,并要求您提供一个返回Promise的请求函数(如果您现在不使用Promise,则只需将您的request函数包装在Promise中即可).

Here's a general concept for a function that will control rate limiting for you to a fixed number of requests per second. This uses promises and requires that you supply a request function that returns a promise (if you aren't using promises now, you just need to wrap your request function in a promise).

// pass the following arguments:
//   array - array of values to iterate
//   requestsPerSec - max requests per second to send (integer)
//   maxInFlight - max number of requests in process at a time
//   fn - function to process an array value
//        function is passed array element as first argument
//        function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
//          or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
    return new Promise(function(resolve, reject) {
        var index = 0;
        var inFlightCntr = 0;
        var doneCntr = 0;
        var launchTimes = [];
        var results = new Array(array.length);

        // calculate num requests in last second
        function calcRequestsInLastSecond() {
            var now = Date.now();
            // look backwards in launchTimes to see how many were launched within the last second
            var cnt = 0;
            for (var i = launchTimes.length - 1; i >= 0; i--) {
                if (now - launchTimes[i] < 1000) {
                    ++cnt;
                } else {
                    break;
                }
            }
            return cnt;            
        }

        function runMore() {
            while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                (function(i) {
                    ++inFlightCntr;
                    launchTimes.push(Date.now());
                    fn(array[i]).then(function(val) {
                        results[i] = val;
                        --inFlightCntr;
                        ++doneCntr;
                        runMore();
                    }, reject);
                })(index);
                ++index;
            }
            // see if we're done
            if (doneCntr === array.length) {
                resolve(results);
            } else if (launchTimes.length >= requestsPerSec) {
                // calc how long we have to wait before sending more
                var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                if (delta >= 0) {
                    setTimeout(runMore, ++delta);
                }

            }
        }
        runMore();
    });
}

示例用法:

rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
    // process array of results here
}, function(err) {
    // process error here
});

此功能的更高级版本称为rateMap()此处在Github上.

A more advanced version of this function called rateMap() is here on Github.

此代码背后的总体思路是:

The general idea behind this code is this:

  1. 您传入一个数组以进行迭代
  2. 它返回一个诺言,谁的解决价值是一系列结果(按顺序)
  3. 您传递的请求数达到了历史最高水平
  4. 您同时传递了飞行中的最大请求数
  5. 您传递了一个函数,该函数将传递要迭代的数组中的一个元素,并且必须返回一个Promise
  6. 它保留最后一次发送请求时的一组时间戳记.
  7. 要查看是否可以发送另一个请求,它会在数组中向后看,并计算最后一秒发送了多少个请求.
  8. 如果该数字低于阈值,那么它将发送另一个.
  9. 如果该数字达到阈值,那么它将计算您需要等待多长时间才能发送另一个数字,并为该时间设置一个计时器.
  10. 每个请求完成后,它会检查是否可以发送更多请求
  11. 如果任何请求拒绝其承诺,则返回的承诺将立即被拒绝.如果您不希望它在出现第一个错误时停止,请修改传入的函数以使其不拒绝,而是使用可以在以后处理结果时标识为失败请求的某些值进行解析.
  1. You pass in an array to iterate through
  2. It returns a promise who's resolved value is an array of results (in order)
  3. You pass a max number of requestsPerSec to ever hit
  4. You pass a max number of requests in flight at the same time
  5. You pass a function that will be passed an element from the array that is being iterated and must return a promise
  6. It keeps an array of timestamps when a request was last sent.
  7. To see if another request can be sent, it looks backwards in the array and counts how many requests were sent in the last second.
  8. If that number is lower than the threshold, then it sends another one.
  9. If that number meets the threshold, then it calciulates how long you have to wait to send another one and set a timer for that amount of time.
  10. Upon completion of each request, it checks to see if it can send more
  11. If any request rejects its promise, then the returned promise rejects immediately. If you don't want it to stop upon first error, then modify your passed in function to not reject, but to resolve with some value that you can identify as a failed request later when processing the results.

这是一个有效的模拟: https://jsfiddle.net/jfriend00/3gr0tq7k/

Here's a working simulation: https://jsfiddle.net/jfriend00/3gr0tq7k/

注意:如果您传入的maxInFlight值高于requestsPerSec值,则此函数基本上只会发送requestPersec请求,然后一秒钟后再发送另一个requestPerSec请求,因为这是保持最快的方法requestsPerSec边界.如果maxInFlight值等于或小于requestsPerSec,则它将发送requestsPerSec,然后随着每个请求的完成,它将查看是否可以发送另一个请求.

Note: If the maxInFlight value you pass in is higher than the requestsPerSec value, then this function will basically just send requestsPerSec requests and then one second later, send another requestsPerSec requests since that's the quickest way to stay under the requestsPerSec boundary. If the maxInFlight value is the same or lower than requestsPerSec then it will send requestsPerSec and then as each request finishes, it will see if it can send another one.

这篇关于选择适当的异步方法进行批处理,以获得最大请求数/秒的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆