Nodejs sqs队列处理器 [英] Nodejs sqs queue processor

查看:25
本文介绍了Nodejs sqs队列处理器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个 nodejs sqs 队列处理器.

I am trying to write a nodejs sqs queue processor.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) {
      if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              throw error;
            }
            console.log('stdout: ' + stdout);
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              });
            }
          });
      }
    }
  });
}
readMessage();

上述代码适用于队列中的单个消息.我应该如何编写此脚本,以便它不断轮询队列中的消息,直到处理完所有消息?我应该使用设置超时吗?

The above code works fine for single message in queue. How should I write this script so that it keeps polling for messages in queue untill all messages are processed? Should i use set timeout?

推荐答案

首先你应该明确地使用亚马逊提供的长轮询技术,据我所知你已经在使用它,因为你有"WaitTimeSeconds": 20 sqs.receiveMessage 调用中的参数.我希望您没有忘记在 AWS Web 界面.

First of all you should definetely use long polling technique provided by Amazon, and as I understand you are already using it because you have "WaitTimeSeconds": 20 argument in sqs.receiveMessage call. I hope that you didn't forget to configure it in the AWS Web interface.

关于消息轮询 - 您可以使用不同的技术,包括计时器,但我认为最简单的方法是在 receiveMessage 的末尾调用您的 readMessage() 函数的(甚至 exec 的)回调函数.因此,队列中下一条消息的处理(或等待)将在队列中前一条消息的处理结束后立即开始.

About polling for messages - you may use different techniques including timers, but I think the most simple will be just call your readMessage() function at the end of receiveMessage's (or even exec's) callback function. So processing of (or waiting for) the next message in queue will start immediately after the end of processing of previous message in queue.

更新:

就我而言,在您的新版本代码中,有很多 readMessage() 调用.我认为最好将其最小化以使代码更清晰且易于维护.但是,例如,如果您离开主 receiveMessage 回调结束时的唯一一个调用,您将收到许多并行运行的 PHP 工作脚本 - 从这一点来看也许还不错从性能的角度来看 - 但是您必须添加一些复杂的脚本来控制并行工作线程的数量.我认为您可以在 exec 回调中减少一些调用,尝试加入 if 并在主回调中加入调用.

As for me in your new version of code there are to many readMessage() calls. I think it is better to minimize it to keep code more clear and easy to maintain. But if you leave, for example, the only one call at the end of your main receiveMessage callback you will recieve a lot of PHP worker scripts running in parallel - and maybe it is not so bad from the point of view of performance - but you will have to add some complicated script to control the amount of parallel workers. I think you can cut some calls in exec callback, try to join ifs and join calls in main callback.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var delay = 20 * 1000;
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) 
      && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              // error handling 
            }
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              }, function(err, data){                
              });
            }
            readMessage();                
          });
      }          
    }        
    readMessage();        
  });
}
readMessage();

关于内存泄漏:我认为你不必担心,因为 readMessage() 的下一次调用发生在回调函数中 - 所以不是递归的,并且递归调用的函数在调用后立即将值返回给父函数receiveMessage() 函数.

About memory leaks: I think that you should not worry because the next call of readMessage() happens in callback function - so not recursively, and recursively called function returns value to parent function just after calling receiveMessage() function.

这篇关于Nodejs sqs队列处理器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆