使用 Node.js 从 Azure WebJob 轮询 Azure 服务总线队列 [英] Polling an Azure Service Bus Queue from an Azure WebJob using Node.js

查看:31
本文介绍了使用 Node.js 从 Azure WebJob 轮询 Azure 服务总线队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试使用用 Node.js 编写的 WebJob 轮询 Azure 服务总线队列.我创建了 2 个 WebJobs.第一个是按需向队列发送 10 条唯一消息.第二个作业是连续的并轮询队列中的消息.

遇到以下问题:

  1. 轮询很慢.接收 10 条消息平均需要大约 10 分钟.请参阅下面的示例日志详细信息.在这个速度下基本无法使用.所有的延迟都来自 receiveQueueMessage 的响应.响应时间从 0 秒到 ~120 秒不等,平均为 60 秒.

  2. 消息是按随机顺序接收的.不是先进先出.

  3. 有时会收到两次消息,即使它们是在 ReceiveAndDelete 模式下读取的(我尝试过不使用默认为 ReceiveAndDelete 的读取模式参数,使用 {isReceiveAndDelete:true} 和与 {isPeekLock:false} 的结果相同).

  4. 当队列为空时,它应该保持接收请求打开一天,但它总是在 230 秒后返回无消息错误.根据文档,最长为 24 天,所以我不知道 230 秒是从哪里来的:

<块引用>

服务总线中阻塞接收操作的最大超时排队时间为 24 天.但是,基于 REST 的超时具有最大值55 秒.

基本上没有什么像宣传的那样有效.我做错了什么?

发送消息测试作业:

var uuid = require('node-uuid');var azure = 要求('天蓝色');var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);var messagesToSend = 10;发送消息(0);函数发送消息(计数){变量消息 = {正文:'测试消息',自定义属性:{message_number:计数,sent_date:新日期},经纪人属性:{MessageId: uuid.v4()//确保服务总线不认为这是重复消息}};serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {如果(!错误){console.log('发送的测试消息编号' + count.toString());} 别的 {console.error('错误发送消息:' + err);}});//等待5秒以确保服务总线以正确的顺序接收消息if (count < messagesToSend) {设置超时(函数(新计数){//发送下一条消息发送消息(新计数);}, 5000, 计数+1);}}

接收消息连续作业:

console.log('listener job started');var azure = 要求('天蓝色');var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);听消息(服务总线);功能listenForMessages(服务总线){var start = process.hrtime();var timeOut = 60*60*24;//1天的长投票serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {var end = process.hrtime(start);console.log('在 %ds 秒内收到响应', end[0]);如果(错误){console.log('错误请求消息:' + err);听消息(服务总线);} 别的 {if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {console.log('收到的测试消息编号' + message.customProperties.message_number.toString());听消息(服务总线);} 别的 {console.log('收到无效消息');听消息(服务总线);}}});}

样本日志输出:

[05/06/2015 21:50:14 >8c2504:SYS INFO] 状态更改为正在运行[05/06/2015 21:50:14 >8c2504: INFO] 侦听器作业已启动[05/06/2015 21:51:23 >8c2504: INFO] 在 1s 秒内收到响应[05/06/2015 21:51:23 >8c2504: INFO] 收到测试消息编号 0[05/06/2015 21:51:25 >8c2504: INFO] 在 2s 秒内收到响应[05/06/2015 21:51:26 >8c2504: INFO] 收到测试消息编号 4[05/06/2015 21:51:27 >8c2504: INFO] 在 1s 秒内收到响应[05/06/2015 21:51:27 >8c2504: INFO] 收到测试消息编号 7[05/06/2015 21:51:28 >8c2504: INFO] 在 0s 秒内收到响应[05/06/2015 21:51:29 >8c2504: INFO] 收到测试消息编号 9[05/06/2015 21:51:49 >8c2504: INFO] 在 20 秒内收到响应[05/06/2015 21:51:49 >8c2504: INFO] 收到测试消息编号 1[05/06/2015 21:53:35 >8c2504: INFO] 在 106 秒内收到响应[05/06/2015 21:53:35 >8c2504: INFO] 收到测试消息编号 1[05/06/2015 21:54:26 >8c2504: INFO] 在 50 秒内收到响应[05/06/2015 21:54:26 >8c2504: INFO] 收到测试消息编号 5[05/06/2015 21:54:35 >8c2504: INFO] 在 9s 秒内收到响应[05/06/2015 21:54:35 >8c2504: INFO] 收到测试消息编号 9[05/06/2015 21:55:28 >8c2504: INFO] 在 53s 秒内收到响应[05/06/2015 21:55:28 >8c2504: INFO] 收到测试消息号 2[05/06/2015 21:57:26 >8c2504: INFO] 在 118 秒内收到响应[05/06/2015 21:57:26 >8c2504: INFO] 收到测试消息编号 6[05/06/2015 21:58:28 >8c2504: INFO] 在 61 秒内收到响应[05/06/2015 21:58:28 >8c2504: INFO] 收到测试消息编号 8[05/06/2015 22:00:35 >8c2504: INFO] 在 126 秒内收到响应[05/06/2015 22:00:35 >8c2504: INFO] 收到测试消息号 3[05/06/2015 22:04:25 >8c2504: INFO] 在 230 秒内收到响应[05/06/2015 22:04:25 >8c2504:INFO] 请求消息时出错:没有要接收的消息[05/06/2015 22:08:16 >8c2504: INFO] 在 230 秒内收到响应[05/06/2015 22:04:25 >8c2504:INFO] 请求消息时出错:没有要接收的消息

解决方案

问题是我使用的队列被分区(在 Azure 门户中创建队列时的默认选项).一旦我创建了一个未分区的新队列,一切都按预期工作而没有延迟(除了长轮询尝试中奇怪的 230 秒超时).所以基本上 node.js 库不适用于分区队列.完全没有.浪费了很多天来弄清楚这一点.将这里留给其他人.

Trying to poll an Azure Service Bus Queue using a WebJob written in Node.js. I created 2 WebJobs. The first is on demand and sends 10 unique messages to the queue. The second job is continuous and polls the queue for messages.

Encountering the following issues:

  1. The polling is SLOW. It takes an average of about 10 minutes to receive 10 messages. See sample log details below. Basically unusable at this speed. All the delay is from getting a response from receiveQueueMessage. Response times vary from 0 seconds to ~120 seconds, with an average of 60 seconds.

  2. The messages are being received in a random order. Not FIFO.

  3. Sometimes messages are received twice, even though they are being read in ReceiveAndDelete mode (I have tried with no read mode parameter which should default to ReceiveAndDelete, with {isReceiveAndDelete:true} and with {isPeekLock:false} with the same results).

  4. When the queue is empty, it should keep the receive request open for a day, but it always returns with a no message error after 230 seconds. According to the documentation the max is 24 days so I don't know where 230 seconds is coming from:

The maximum timeout for a blocking receive operation in Service Bus queues is 24 days. However, REST-based timeouts have a maximum value of 55 seconds.

Basically nothing works as advertised. What am I doing wrong?

Send Message Test Job:

var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;

sendMessage(0);

function sendMessage(count)
{
    var message = {
        body: 'test message',
        customProperties: {
            message_number: count,
            sent_date: new Date
        },
        brokerProperties: {
            MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
        }
    };

    serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {

        if (!err) {
            console.log('sent test message number ' + count.toString());
        } else {
            console.error('error sending message: ' + err);
        }

    });

    //wait 5 seconds to ensure messages are received by service bus in correct order
    if (count < messagesToSend) {
        setTimeout(function(newCount) {
            //send next message
            sendMessage(newCount);
        }, 5000, count+1);
    }
}    

Receive Message Continuous Job:

console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);

function listenForMessages(serviceBus)
{
    var start = process.hrtime();
    var timeOut = 60*60*24; //long poll for 1 day
    serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {

        var end = process.hrtime(start);
        console.log('received a response in %ds seconds', end[0]);

        if (err) {

            console.log('error requesting message: ' + err);
            listenForMessages(serviceBus);

        } else {

            if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {

                console.log('received test message number ' + message.customProperties.message_number.toString());
                listenForMessages(serviceBus);

            } else {

                console.log('invalid message received');
                listenForMessages(serviceBus);

            }

        }

    });
}

Sample Log Output:

[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds    
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive

解决方案

And the issue was the queue I was using was partitioned (the default option when creating a queue in the Azure portal). Once I created a new queue that was not partitioned, everything worked as expected without the lag (other than the weird 230 second timeout on a long poll attempt). So basically the node.js library doesn't work for partitioned queues. At all. Wasted many days figuring that one out. Will leave this here for others.

这篇关于使用 Node.js 从 Azure WebJob 轮询 Azure 服务总线队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆