NodeJ延迟Promise.all()中的每个诺言 [英] NodeJs delay each promise within Promise.all()

查看:136
本文介绍了NodeJ延迟Promise.all()中的每个诺言的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试更新前一段时间创建的使用nodejs的工具(我不是JS开发人员,所以我试图将代码拼凑在一起),并陷入最后的障碍。



新功能将采用大范围的.json定义,使用'aws-sdk将端点与AWS Service上匹配的API网关进行比较' JS SDK,然后相应地更新网关。



代码在一个小的定义文件(大约15个端点)上运行良好,但是一旦我给它一个更大的文件,我就会开始得到大量的 TooManyRequestsException 错误。



我知道这是由于我对API网关服务的调用太快而需要延迟/暂停。这是我被困住的地方



我尝试添加;




  • 有延迟()返回的每个诺言

  • 在每个诺言中运行setTimeout()

  • 给Promise.all和Promise.mapSeries添加了延迟/ li>


当前,我的代码遍历定义中的每个端点,然后将每个promise的响应添加到promise数组中:

  promises.push(getMethodResponse(resourceMethod,value,apiName,resourcePath)); 

循环完成后,我运行以下命令:

 返回Promise.all(应许)
.catch((err)=> {
winston.error(err);
})

我已经尝试过使用mapSeries进行同样的操作(没有运气)。



({code> getMethodResponse promise)中的函数似乎立即运行,因此,无论哪种类型的延迟,我补充说它们仍然只是执行。我的可疑之处是我需要使( getMethodResponse )返回一个函数,然后使用mapSeries,但我也无法使它正常工作。



我尝试的代码:
getMethodResponse 包裹在其中:

 返回函数(值){} 

然后在循环之后(和循环内-没有区别)添加此代码:

  Promise.mapSeries(function(promises) {
return'a'();
})。then(function(results){
console.log('result',results);
});

还尝试了许多其他建议:



此处



此处



请问有什么建议吗?



编辑



根据要求,可以使用一些其他代码来查明问题所在。



当前正在少数端点(在Swagger文件中)使用的代码:

  module.exports =(apiName,externalUrl)=> {

return getSwaggerFromHttp(externalUrl)
.then((swagger)=> {
let path = swagger.paths;
let resourcePath ='';
let resourceMethod ='';
let promises = [];

_.each(paths,function(value,key){
resourcePath = key;
_.each(value,function(value,key){
resourceMethod = key;
let statusList = [];
_.each(value.responses,function(value,key ){
if(key> = 200&& key< = 204){
statusList.push(key)
}
});
_.each(statusList,function(value,key){//仅适用于200-201范围

//使用小型集合
promises.push(getMethodResponse(resourceMethod,value,apiName ,resourcePath))
});
});
});

//使用小型集合
返回Promise.all(promises)
.catch((err)=> {
winston.error(err);
})
})
.catch((err)=> {
winston.error(err);
});

};



添加它代替Promise.all()返回值:

  Promise.map(promises,function(){
// Promise.map也等待返回的承诺
console.log('X');
},{concurrency:5})
.then(function(){
return console.log( y);
});

此操作的结果是这样的(每个端点都是相同的,有很多):


错误:TooManyRequestsException:请求太多
X
错误:TooManyRequestsException:请求太多
X
错误:TooManyRequestsException:太多请求


在每个promise中,AWS SDK被调用3次,其功能是(从getMethodResponse()函数启动):

  apigateway.getRestApisAsync()
返回apigateway.getResourcesAsync( resourceParams)
apigateway.getMethodAsync(params,function(err,data){}

The典型的AWS开发工具包文档指出,这是连续调用太多(太快)时的典型行为。过去我遇到过类似的问题,只需在被调用的代码中添加.delay(500)即可解决。 ;



类似的东西:

  return apigateway.updateModelAsync(updateModelParams)
.tap(()= > logger.verbose(`更新模型$ {updatedModel.name}`))
.tap(()=> bar.tick())
.delay(500)

编辑#2



完整的名称,包括我的整个 .js 文件。

 使用严格; 

const AWS = require('aws-sdk');
let apigateway,lambda;
const Promise = require(‘bluebird’);
const R = require(‘ramda’);
const logger = require(’../ logger');
const config = require(’../ config / default’);
const helpers = require(’../ library / helpers’);
const winston = require('winston');
const request = require(’request’);
const _ = require('lodash');
const region ='ap-southeast-2';
const methodLib = require('../ aws / methods');

const发射器= require(’../ library / emitter’);
generator.on('updateRegion',(region)=> {
region = region;
AWS.config.update({region:region});
apigateway =新的AWS.APIGateway({apiVersion:'2015-07-09'});
Promise.promisifyAll(apigateway);
});

函数getSwaggerFromHttp(externalUrl){
返回新的Promise((resolve,reject)=> {
request.get({
url:externalUrl,
标头:{
content-type: application / json
}
},(err,res,body)= >> {
if(err) {
winston.error(err);
reject(err);
}

let result = JSON.parse(body);
resolve(结果);
})
});
}

/ *
删除方法响应
* /
函数deleteMethodResponse(httpMethod,resourceId,restApiId,statusCode,resourcePath){

let methodResponseParams = {
httpMethod:httpMethod,
resourceId:resourceId,
restApiId:restApiId,
statusCode:statusCode
};

返回apigateway.deleteMethodResponseAsync(methodResponseParams)
.delay(1200)
.tap(()=> logger.verbose(`方法响应$ {statusCode}已删除路径:$ {resourcePath}`))
.error((e)=> {
return console.log(`在资源路径上找不到删除方法响应$ {httpMethod}时出错:$ {resourcePath} (resourceId:$ {resourceId})`); //发生错误
logger.error('Error:'+ e.stack)
});
}

/ *
删除集成响应
* /
函数deleteIntegrationResponse(httpMethod,resourceId,restApiId,statusCode,resourcePath){

let methodResponseParams = {
httpMethod:httpMethod,
resourceId:resourceId,
restApiId:restApiId,
statusCode:statusCode
};

返回apigateway.deleteIntegrationResponseAsync(methodResponseParams)
.delay(1200)
.tap(()=> logger.verbose(`集成响应$ {statusCode}已删除路径$ {resourcePath}`))
.error((e)=> {
return console.log(`在资源路径上找不到删除集成响应$ {httpMethod}时出错:$ {resourcePath}( resourceId:$ {resourceId})`); //发生错误
logger.error('Error:'+ e.stack)
});
}

/ *
获取资源
* /
函数getMethodResponse(httpMethod,statusCode,apiName,resourcePath){

let params = {
httpMethod:httpMethod.toUpperCase(),
resourceId:'',
restApiId:``
}

return getResourceDetails( apiName,resourcePath)
.error((e)=> {
logger.unimportant('Error:'+ e.stack)
})
.then((结果)=> {
//如果(通过传入的URL)在AWS Gateway
中找到了resourceId(如果(结果)){
params.resourceId = result.resourceId
params.restApiId = result.apiId

var awsMethodResponses = [];
try {
apigateway.getMethodAsync(params,function(err,data) {
if(err){
if( err.statusCode == 404){
return console.log(`方法$ {params.httpMethod}在资源路径上找不到:$ {resourcePath}(resourceId:$ {params.resourceId})`); //发生错误
}
console.log(err,err.stack); //发生错误
}
else {
if(data){
_.each(data.methodResponses,function(value,key){
if(键> = 200&&键< = 204){
awsMethodResponses.push(key)
}
});
awsMethodResponses = _.pull(awsMethodResponses,statusCode); //网关中找不到的项目列表-将被删除。
_.each(awsMethodResponses,function(value,key){
if(data.methodResponses [value] .responseModels){
var existingModel = data.methodResponses [value] .responseModels [' application / json']; //检查当前是否有模型附加到要删除的资源/方法上
methodLib.updateResponseAssociation(params.httpMethod,params.resourceId,params.restApiId,statusCode,existingModel); //在新的响应状态
}
deleteMethodResponse(params.httpMethod,params.resourceId,params.restApiId,value,resourcePath)下将此模型与相同的资源/方法相关联
.delay (1200)
.done();
deleteIntegratio nResponse(params.httpMethod,params.resourceId,params.restApiId,value,resourcePath)
.delay(1200)
.done();
}}
}
}
})
.catch(err => {
console.log(`Error:$ {err}` );
});
}
catch(e){
console.log(`getMethodAsync失败,错误:$ {e}`);
}
}
})
};

函数getResourceDetails(apiName,resourcePath){

let resourceExpr = new RegExp(resourcePath +'$','i');

让结果= {
apiId:'',
resourceId:'',
路径:''
}

返回helpers.apiByName(apiName,AWS.config.region)
.delay(1200)
.then(apiId => {
result.apiId = apiId;

let resourceParams = {
restApiId:apiId,
限制:config.awsGetResourceLimit,
};

返回apigateway.getResourcesAsync(resourceParams)

})
.then(R.prop('items'))
.filter(R.pipe(R.prop('path'),R.test(resourceExpr)))
.tap(helpers.handleNotFound('resource'))
.then(R.head)
.then([R.prop('path'),R.prop('id ')])
.then(returnedObj => {
if(returnedObj.id){
result.path = returnObj.path;
result.resourceId = returnObj.id ;
logger.unimporta nt(`ApiId:$ {result.apiId} | ResourceId:$ {result.resourceId} |路径:$ {result.path}`);
返回结果;
}
})
.catch(err => {
console.log(`API上的错误:$ {err}:$ {apiName}资源:$ {resourcePath }`);
});
};

函数delay(t){
返回新的Promise(function(resolve){
setTimeout(resolve,t)
});
}

module.exports =(apiName,externalUrl)=> {

return getSwaggerFromHttp(externalUrl)
.then((swagger)=> {
let path = swagger.paths;
let resourcePath ='';
let resourceMethod ='';
let promises = [];

_.each(paths,function(value,key){
resourcePath = key;
_.each(value,function(value,key){
resourceMethod = key;
let statusList = [];
_.each(value.responses,function(value,key ){
if(key> = 200&& key< = 204){
statusList.push(key)
}
});
_.each(statusList,function(value,key){//仅适用于200-201范围

promises.push(getMethodResponse(resourceMethod,value,apiName,resourcePath))

});
});
});

//使用小型集合
返回Promise.all(promises)
.catch((err)=> {
winston.error(err);
})
})
.catch((err)=> {
winston.error(err);
});
};


解决方案

您显然对<$ c $有误解c> Promise.all()和 Promise.map()可以。



Promise.all()所做的全部工作是跟踪一整套承诺,以告诉您它们所代表的异步操作何时全部完成(或返回一个错误)。当您将其传递给一组promise时(如您所做的那样),所有这些异步操作已经并行启动。因此,如果您想限制同时进行的异步操作数量,那么在这一点上为时已晚。因此, Promise.all()本身不会帮助您以任何方式控制一次运行的数量。


此后我也注意到,似乎这一行 promises.push(getMethodResponse(resourceMethod,value,apiName,resourcePath))实际上是在执行诺言,而不仅仅是将它们添加到数组中。好像最后一个 Promise.all()实际上并没有做什么。


是的,当您执行 promises.push(getMethodResponse())时,您现在就立即调用 getMethodResponse()。这将立即启动异步操作。然后,该函数返回一个Promise, Promise.all()将监视该Promise(以及您放入数组中的所有其他Promise),以告诉您它们何时完成。这就是 Promise.all()的全部工作。它监视您已经开始的操作。为了使同时运行的最大请求数保持在某个阈值以下,您不必像执行操作时一样立即启动异步操作。 Promise.all()不能为您做到这一点。






要让Bluebird的 Promise.map()能够完全帮助您,您必须向其传递数据数组,而不是允诺。当您传递给它的表示您已经开始的异步操作的诺言数组时,它只能做 Promise.all()可以做的事情。但是,如果将数据数组和回调函数传递给它,然后可以对数组中的每个数据元素发起异步操作,那么当您使用 concurrency 选项。



您的代码非常复杂,因此我将用一个简单的Web抓取工具进行说明,该抓取工具要读取大量URL,但出于内存考虑,一次只能处理20个

  const rp = require('request-promise');。 
let urls = [...]; //用于处理

Promise.map(urls,function(url){
return rp(url).then(function(data){
//在这里处理抓取的数据
return someValue;
});
},{concurrency:20})。then(function(results){
//在此处处理结果数组
})。catch(function(err){
//错误在这里
});

在此示例中,希望您可以看到将一系列数据项传递到 Promise.map()(不是承诺数组)。然后,这允许 Promise.map()管理数组的处理方式/时间,在这种情况下,它将使用并发:20 设置,以确保同时发送的请求不超过20个。






您使用 Promise.map()的努力正在传递一组诺言,这对您没有帮助,因为诺言表示已经开始的异步操作:

  Promise.map(promise,function(){
...
});






然后,您确实需要计算通过阅读具有此问题的目标API的文档或进行大量测试来找出导致 TooManyRequestsException 错误的确切原因,因为可能有多种原因导致这并且不确切知道您需要控制什么,只是花了很多大胆的猜测才能找出可能有效的方法。 API可能检测到的最常见的事物是:


  1. 来自同一帐户或来源的同时请求。

  2. 单位时间从同一帐户或来源发出的请求(例如每秒请求)。

<$ c $ Promise.map()中的c> concurrency 操作将轻松帮助您选择第一个选项,但不一定会帮助您选择第二个选项,因为您可以限制同时请求的数量少,但仍然超过每秒请求数的限制。第二个需要一些实际的时间控制。插入 delay()语句有时会起作用,但是即使那样也不是一种非常直接的方法来管理它,否则会导致控制不一致(某些情况有时会起作用,但其他情况下不会起作用)次)或次优控制(将自己限制在远低于实际使用的范围内)。



要管理每秒的请求限制,您需要一些实际时间



这是一个用于限制每秒发出的请求数的方案示例:< a href = https://stackoverflow.com/questions/36730745/choose-proper-async-method-for-batch-processing/36736593#36736593>如何管理请求以保持低于速率限制。


I'm trying to update a tool that was created a while ago which uses nodejs (I am not a JS developer, so I'm trying to piece the code together) and am getting stuck at the last hurdle.

The new functionality will take in a swagger .json definition, compare the endpoints against the matching API Gateway on the AWS Service, using the 'aws-sdk' SDK for JS and then updates the Gateway accordingly.

The code runs fine on a small definition file (about 15 endpoints) but as soon as I give it a bigger one, I start getting tons of TooManyRequestsException errors.

I understand that this is due to my calls to the API Gateway service being too quick and a delay / pause is needed. This is where I am stuck

I have tried adding;

  • a delay() to each promise being returned
  • running a setTimeout() in each promise
  • adding a delay to the Promise.all and Promise.mapSeries

Currently my code loops through each endpoint within the definition and then adds the response of each promise to a promise array:

promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath)); 

Once the loop is finished I run this:

        return Promise.all(promises)
        .catch((err) => {
            winston.error(err);
        })

I have tried the same with a mapSeries (no luck).

It looks like the functions within the (getMethodResponse promise) are run immediately and hence, no matter what type of delay I add they all still just execute. My suspicious is that the I need to make (getMethodResponse) return a function and then use mapSeries but I cant get this to work either.

Code I tried: Wrapped the getMethodResponse in this:

return function(value){}

Then added this after the loop (and within the loop - no difference):

 Promise.mapSeries(function (promises) {
 return 'a'();
 }).then(function (results) {
 console.log('result', results);
 });

Also tried many other suggestions:

Here

Here

Any suggestions please?

EDIT

As request, some additional code to try pin-point the issue.

The code currently working with a small set of endpoints (within the Swagger file):

module.exports = (apiName, externalUrl) => {

return getSwaggerFromHttp(externalUrl)
    .then((swagger) => {
        let paths = swagger.paths;
        let resourcePath = '';
        let resourceMethod = '';
        let promises = [];

        _.each(paths, function (value, key) {
            resourcePath = key;
            _.each(value, function (value, key) {
                resourceMethod = key;
                let statusList = [];
                _.each(value.responses, function (value, key) {
                    if (key >= 200 && key <= 204) {
                        statusList.push(key)
                    }
                });
                _.each(statusList, function (value, key) { //Only for 200-201 range  

                    //Working with small set 
                    promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))
                });             
            });
        });

        //Working with small set
        return Promise.all(promises)
        .catch((err) => {
            winston.error(err);
        })
    })
    .catch((err) => {
        winston.error(err);
    });

};

I have since tried adding this in place of the return Promise.all():

            Promise.map(promises, function() {
            // Promise.map awaits for returned promises as well.
            console.log('X');
        },{concurrency: 5})
        .then(function() {
            return console.log("y");
        });

Results of this spits out something like this (it's the same for each endpoint, there are many):

Error: TooManyRequestsException: Too Many Requests X Error: TooManyRequestsException: Too Many Requests X Error: TooManyRequestsException: Too Many Requests

The AWS SDK is being called 3 times within each promise, the functions of which are (get initiated from the getMethodResponse() function):

apigateway.getRestApisAsync()
return apigateway.getResourcesAsync(resourceParams)
apigateway.getMethodAsync(params, function (err, data) {}

The typical AWS SDK documentation state that this is typical behaviour for when too many consecutive calls are made (too fast). I've had a similar issue in the past which was resolved by simply adding a .delay(500) into the code being called;

Something like:

    return apigateway.updateModelAsync(updateModelParams)
    .tap(() => logger.verbose(`Updated model ${updatedModel.name}`))
    .tap(() => bar.tick())
    .delay(500)

EDIT #2

I thought in the name of thorough-ness, to include my entire .js file.

'use strict';

const AWS = require('aws-sdk');
let apigateway, lambda;
const Promise = require('bluebird');
const R = require('ramda');
const logger = require('../logger');
const config = require('../config/default');
const helpers = require('../library/helpers');
const winston = require('winston');
const request = require('request');
const _ = require('lodash');
const region = 'ap-southeast-2';
const methodLib = require('../aws/methods');

const emitter = require('../library/emitter');
emitter.on('updateRegion', (region) => {
    region = region;
    AWS.config.update({ region: region });
    apigateway = new AWS.APIGateway({ apiVersion: '2015-07-09' });
    Promise.promisifyAll(apigateway);
});

function getSwaggerFromHttp(externalUrl) {
    return new Promise((resolve, reject) => {
        request.get({
            url: externalUrl,
            header: {
                "content-type": "application/json"
            }
        }, (err, res, body) => {
            if (err) {
                winston.error(err);
                reject(err);
            }

            let result = JSON.parse(body);
            resolve(result);
        })
    });
}

/*
    Deletes a method response
*/
function deleteMethodResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {

    let methodResponseParams = {
        httpMethod: httpMethod,
        resourceId: resourceId,
        restApiId: restApiId,
        statusCode: statusCode
    };

    return apigateway.deleteMethodResponseAsync(methodResponseParams)
        .delay(1200)
        .tap(() => logger.verbose(`Method response ${statusCode} deleted for path: ${resourcePath}`))
        .error((e) => {
            return console.log(`Error deleting Method Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
            logger.error('Error: ' + e.stack)
        });
}

/*
    Deletes an integration response
*/
function deleteIntegrationResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {

    let methodResponseParams = {
        httpMethod: httpMethod,
        resourceId: resourceId,
        restApiId: restApiId,
        statusCode: statusCode
    };

    return apigateway.deleteIntegrationResponseAsync(methodResponseParams)
        .delay(1200)
        .tap(() => logger.verbose(`Integration response ${statusCode} deleted for path ${resourcePath}`))
        .error((e) => {
            return console.log(`Error deleting Integration Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
            logger.error('Error: ' + e.stack)
        });
}

/*
    Get Resource
*/
function getMethodResponse(httpMethod, statusCode, apiName, resourcePath) {

    let params = {
        httpMethod: httpMethod.toUpperCase(),
        resourceId: '',
        restApiId: ''
    }

    return getResourceDetails(apiName, resourcePath)
        .error((e) => {
            logger.unimportant('Error: ' + e.stack)
        }) 
        .then((result) => {
            //Only run the comparrison of models if the resourceId (from the url passed in) is found within the AWS Gateway
            if (result) {
                params.resourceId = result.resourceId
                params.restApiId = result.apiId

                var awsMethodResponses = [];
                try {
                    apigateway.getMethodAsync(params, function (err, data) {
                        if (err) {
                            if (err.statusCode == 404) {
                                return console.log(`Method ${params.httpMethod} not found on resource path: ${resourcePath} (resourceId: ${params.resourceId})`); // an error occurred
                            }
                            console.log(err, err.stack); // an error occurred
                        }
                        else {
                            if (data) {
                                _.each(data.methodResponses, function (value, key) {
                                    if (key >= 200 && key <= 204) {
                                        awsMethodResponses.push(key)
                                    }
                                });
                                awsMethodResponses = _.pull(awsMethodResponses, statusCode); //List of items not found within the Gateway - to be removed.
                                _.each(awsMethodResponses, function (value, key) {
                                    if (data.methodResponses[value].responseModels) {
                                        var existingModel = data.methodResponses[value].responseModels['application/json']; //Check if there is currently a model attached to the resource / method about to be deleted
                                        methodLib.updateResponseAssociation(params.httpMethod, params.resourceId, params.restApiId, statusCode, existingModel); //Associate this model to the same resource / method, under the new response status
                                    }
                                    deleteMethodResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
                                        .delay(1200)
                                        .done();
                                    deleteIntegrationResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
                                        .delay(1200)
                                        .done();
                                })
                            }
                        }
                    })
                        .catch(err => {
                            console.log(`Error: ${err}`);
                        });
                }
                catch (e) {
                    console.log(`getMethodAsync failed, Error: ${e}`);
                }
            }
        })
};

function getResourceDetails(apiName, resourcePath) {

    let resourceExpr = new RegExp(resourcePath + '$', 'i');

    let result = {
        apiId: '',
        resourceId: '',
        path: ''
    }

    return helpers.apiByName(apiName, AWS.config.region)
        .delay(1200)
        .then(apiId => {
            result.apiId = apiId;

            let resourceParams = {
                restApiId: apiId,
                limit: config.awsGetResourceLimit,
            };

            return apigateway.getResourcesAsync(resourceParams)

        })
        .then(R.prop('items'))
        .filter(R.pipe(R.prop('path'), R.test(resourceExpr)))
        .tap(helpers.handleNotFound('resource'))
        .then(R.head)
        .then([R.prop('path'), R.prop('id')])
        .then(returnedObj => {
            if (returnedObj.id) {
                result.path = returnedObj.path;
                result.resourceId = returnedObj.id;
                logger.unimportant(`ApiId: ${result.apiId} | ResourceId: ${result.resourceId} | Path: ${result.path}`);
                return result;
            }
        })
        .catch(err => {
            console.log(`Error: ${err} on API: ${apiName} Resource: ${resourcePath}`);
        });
};

function delay(t) {
    return new Promise(function(resolve) { 
        setTimeout(resolve, t)
    });
 }

module.exports = (apiName, externalUrl) => {

    return getSwaggerFromHttp(externalUrl)
        .then((swagger) => {
            let paths = swagger.paths;
            let resourcePath = '';
            let resourceMethod = '';
            let promises = [];

            _.each(paths, function (value, key) {
                resourcePath = key;
                _.each(value, function (value, key) {
                    resourceMethod = key;
                    let statusList = [];
                    _.each(value.responses, function (value, key) {
                        if (key >= 200 && key <= 204) {
                            statusList.push(key)
                        }
                    });
                    _.each(statusList, function (value, key) { //Only for 200-201 range  

                        promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))

                    });             
                });
            });

            //Working with small set
            return Promise.all(promises)
            .catch((err) => {
                winston.error(err);
            })
        })
        .catch((err) => {
            winston.error(err);
        });
};

解决方案

You apparently have a misunderstanding about what Promise.all() and Promise.map() do.

All Promise.all() does is keep track of a whole array of promises to tell you when the async operations they represent are all done (or one returns an error). When you pass it an array of promises (as you are doing), ALL those async operations have already been started in parallel. So, if you're trying to limit how many async operations are in flight at the same time, it's already too late at that point. So, Promise.all() by itself won't help you control how many are running at once in any way.

I've also noticed since, that it seems this line promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath)) is actually executing promises and not simply adding them to the array. Seems like the last Promise.all() doesn't actually do much.

Yep, when you execute promises.push(getMethodResponse()), you are calling getMethodResponse() immediately right then. That starts the async operation immediately. That function then returns a promise and Promise.all() will monitor that promise (along with all the other ones you put in the array) to tell you when they are all done. That's all Promise.all() does. It monitors operations you've already started. To keep the max number of requests in flight at the same time below some threshold, you have to NOT START the async operations all at once like you are doing. Promise.all() does not do that for you.


For Bluebird's Promise.map() to help you at all, you have to pass it an array of DATA, not promises. When you pass it an array of promises that represent async operations that you've already started, it can do no more than Promise.all() can do. But, if you pass it an array of data and a callback function that can then initiate an async operation for each element of data in the array, THEN it can help you when you use the concurrency option.

Your code is pretty complex so I will illustrate with a simple web scraper that wants to read a large list of URLs, but for memory considerations, only process 20 at a time.

const rp = require('request-promise');
let urls = [...];    // large array of URLs to process

Promise.map(urls, function(url) {
    return rp(url).then(function(data) {
        // process scraped data here
        return someValue;
    });
}, {concurrency: 20}).then(function(results) {
   // process array of results here
}).catch(function(err) {
    // error here
});

In this example, hopefully you can see that an array of data items are being passed into Promise.map() (not an array of promises). This, then allows Promise.map() to manage how/when the array is processed and, in this case, it will use the concurrency: 20 setting to make sure that no more than 20 requests are in flight at the same time.


Your effort to use Promise.map() was passing an array of promises, which does not help you since the promises represent async operations that have already been started:

Promise.map(promises, function() {
    ...
});


Then, in addition, you really need to figure out what exactly causes the TooManyRequestsException error by either reading documentation on the target API that exhibits this or by doing a whole bunch of testing because there can be a variety of things that might cause this and without knowing exactly what you need to control, it just takes a lot of wild guesses to try to figure out what might work. The most common things that an API might detect are:

  1. Simultaneous requests from the same account or source.
  2. Requests per unit of time from the same account or source (such as request per second).

The concurrency operation in Promise.map() will easily help you with the first option, but will not necessarily help you with the second option as you can limit to a low number of simultaneous requests and still exceed a requests per second limit. The second needs some actual time control. Inserting delay() statements will sometimes work, but even that is not a very direct method of managing it and will either lead to inconsistent control (something that works sometimes, but not other times) or sub-optimal control (limiting yourself to something far below what you can actually use).

To manage to a request per second limit, you need some actual time control with a rate limiting library or actual rate limiting logic in your own code.

Here's an example of a scheme for limiting the number of requests per second you are making: How to Manage Requests to Stay Below Rate Limiting.

这篇关于NodeJ延迟Promise.all()中的每个诺言的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆