Node.js中的批处理请求 [英] Batch requests in Node.js

查看:73
本文介绍了Node.js中的批处理请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的程序正在与每秒只接受~10个请求的Web服务进行通信。我的程序有时会向Web服务发送100多个并发请求,导致程序崩溃。

My program is communicating with a web service that only accepts ~10 requests per second. From time to time, my program sends 100+ concurrent requests to the web service, causing my program to crash.

如何将Node.js中的并发请求限制为5每秒?我正在使用请求库。

How do I limit concurrent requests in Node.js to 5 per second? Im using the request library.

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };

然后是本土的批处理方法:

And then the homegrown batch method:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};

以及实际请求的示例:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};


推荐答案

使用 async 库, mapLimit 函数完全符合您的要求。我没有为您的特定用例提供示例,因为您没有提供任何代码。

Using the async library, the mapLimit function does exactly what you want. I can't provide an example for your specific use case as you did not provide any code.

来自自述文件:

与地图相同不超过限制 迭代器将同时在
运行。

The same as map only no more than "limit" iterators will be simultaneously running at any time.

请注意,这些项目不是批量处理的,因此无法保证
第一个限制迭代器函数将在任何其他函数开始之前完成。

Note that the items are not processed in batches, so there is no guarantee that the first "limit" iterator functions will complete before any others are started.

参数


  • arr - 要迭代的数组。

  • limit - 随时运行的最大迭代器数。

  • iterator(item,callback) - 一个应用于数组中每个项目的函数。
    迭代器传递一个回调(错误,转换),必须调用一次
    它已经完成并出现错误(可以为null)和转换项。

  • callback(err,results) - 在所有迭代器
    函数完成或发生错误后调用的回调函数。结果是来自原始数组的
    转换项的数组。

  • arr - An array to iterate over.
  • limit - The maximum number of iterators to run at any time.
  • iterator(item, callback) - A function to apply to each item in the array. The iterator is passed a callback(err, transformed) which must be called once it has completed with an error (which can be null) and a transformed item.
  • callback(err, results) - A callback which is called after all the iterator functions have finished, or an error has occurred. Results is an array of the transformed items from the original array.

示例


async.mapLimit(['file1','file2','file3'],1,fs.stat,function(err) ,结果){
//结果现在是每个文件的统计数据
});

编辑:既然您提供了代码,我看到您的使用与我的假设有点不同。当您知道要预先运行的所有任务时, async 库会更有用。我不知道有一个库可以轻松解决这个问题。上述说明可能仍然与搜索此主题的人员相关,因此我将其保留。

Now that you provided code, I see that your use is a bit different from what I assumed. The async library is more useful when you know all the tasks to run up front. I don't know of a library off hand that will easily solve this for you. The above note is likely still relevant to people searching this topic so I'll leave it in.

抱歉,我没有时间重新构建您的代码,但是这个是一个(未经测试的)函数示例,该函数发出异步请求,同时自我限制为每秒5个请求。我强烈建议您使用这个来提供适合您代码库的更通用的解决方案。

Sorry, I don't have time to restructure your code, but this is an (un-tested) example of a function that makes an asynchronous request while self-throttling itself to 5 requests per second. I would highly recommend working off of this to come up with a more general solution that fits your code base.

var throttledRequest = (function () {
    var queue = [], running = 0;

    function sendPossibleRequests() {
        var url;
        while (queue.length > 0 && running < 5) {
            url = queue.shift();
            running++;
            r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
                running--;
                sendPossibleRequests();

                if(err)
                    console.log(err);
                else {
                    var data = JSON.parse(body);
                    self.emit("data.contact", data);
                }
            });
        }
    }

    return function (url) {
        queue.push(url);
        sendPossibleRequests();
    };
})();

基本上,您要保留所有数据的队列以进行异步处理(例如要求请求的网址) )然后在每次回调后(来自请求),您尝试尽可能多地启动剩余请求。

Basically, you keep a queue of all the data to be asynchronously processed (such as urls to be requested) and then after each callback (from a request) you try to launch off as many remaining requests as possible.

这篇关于Node.js中的批处理请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆