Node.js sqs队列处理器 [英] Nodejs sqs queue processor

查看:105
本文介绍了Node.js sqs队列处理器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个nodejs sqs队列处理器.

I am trying to write a nodejs sqs queue processor.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) {
      if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              throw error;
            }
            console.log('stdout: ' + stdout);
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              });
            }
          });
      }
    }
  });
}
readMessage();

上面的代码对于队列中的单个消息工作正常.我应该如何编写此脚本,以便在处理所有消息之前一直轮询队列中的消息?我应该使用设置超时时间吗?

The above code works fine for single message in queue. How should I write this script so that it keeps polling for messages in queue untill all messages are processed? Should i use set timeout?

推荐答案

首先,您应该明确使用Amazon提供的长轮询技术,据我了解,您已经在使用它,因为sqs.receiveMessage调用中的"WaitTimeSeconds": 20自变量.希望您不要忘记在 AWS Web界面中进行配置a>.

First of all you should definetely use long polling technique provided by Amazon, and as I understand you are already using it because you have "WaitTimeSeconds": 20 argument in sqs.receiveMessage call. I hope that you didn't forget to configure it in the AWS Web interface.

关于轮询消息-您可能会使用不同的技术,包括计时器,但我认为最简单的方法是在receiveMessage(甚至是exec)回调的末尾调用readMessage()函数功能.因此,队列中上一条消息的处理结束后,将立即开始处理(或等待)队列中的下一条消息.

About polling for messages - you may use different techniques including timers, but I think the most simple will be just call your readMessage() function at the end of receiveMessage's (or even exec's) callback function. So processing of (or waiting for) the next message in queue will start immediately after the end of processing of previous message in queue.

更新:

对于我来说,在您的新版本代码中,有许多readMessage()调用.我认为最好将其最小化以使代码更清晰和易于维护.但是,例如,如果您在主receiveMessage回调的末尾仅留下一个调用,您将收到许多并行运行的PHP工作程序脚本-从性能的角度来看,这可能还不错-但是您将必须添加一些复杂的脚本来控制并行工作程序的数量.我认为您可以在exec回调中减少一些调用,尝试加入if并在主回调中加入调用.

As for me in your new version of code there are to many readMessage() calls. I think it is better to minimize it to keep code more clear and easy to maintain. But if you leave, for example, the only one call at the end of your main receiveMessage callback you will recieve a lot of PHP worker scripts running in parallel - and maybe it is not so bad from the point of view of performance - but you will have to add some complicated script to control the amount of parallel workers. I think you can cut some calls in exec callback, try to join ifs and join calls in main callback.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var delay = 20 * 1000;
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) 
      && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              // error handling 
            }
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              }, function(err, data){                
              });
            }
            readMessage();                
          });
      }          
    }        
    readMessage();        
  });
}
readMessage();

关于内存泄漏:我认为您不必担心,因为readMessage()的下一次调用发生在回调函数中-因此不是递归的,并且递归调用的函数会在调用receiveMessage()函数之后将值返回给父函数.

About memory leaks: I think that you should not worry because the next call of readMessage() happens in callback function - so not recursively, and recursively called function returns value to parent function just after calling receiveMessage() function.

这篇关于Node.js sqs队列处理器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆