如何并行蔚蓝工作者角色? [英] How to parallelize an azure worker role?

查看:127
本文介绍了如何并行蔚蓝工作者角色?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有在Azure上运行辅助角色。

I have got a Worker Role running in azure.

这工作进程,其中有大量的整数的队列。对于每个整数I具有(按整数从1秒至10分钟)以做处理相当长。

This worker processes a queue in which there are a large number of integers. For each integer I have to do processings quite long (from 1 second to 10 minutes according to the integer).

由于这是一个相当耗时的,我想在平行做这些处理。不幸的是,我似乎并行时,我用400整数队列测试不是有效的。

As this is quite time consuming, I would like to do these processings in parallel. Unfortunately, my parallelization seems to not be efficient when I test with a queue of 400 integers.

下面是我的实现:

  public class WorkerRole : RoleEntryPoint {
        private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
        private readonly Manager _manager = Manager.Instance;
        private static readonly LogManager logger = LogManager.Instance;

        public override void Run() {
            logger.Info("Worker is running");

            try {
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            catch (Exception e) {
                logger.Error(e, 0, "Error Run Worker: " + e);
            }
            finally {
                this.runCompleteEvent.Set();
            }
        }

        public override bool OnStart() {
            bool result = base.OnStart();

            logger.Info("Worker has been started");

            return result;
        }

        public override void OnStop() {
            logger.Info("Worker is stopping");

            this.cancellationTokenSource.Cancel();
            this.runCompleteEvent.WaitOne();

            base.OnStop();

            logger.Info("Worker has stopped");
        }

        private async Task RunAsync(CancellationToken cancellationToken) {
            while (!cancellationToken.IsCancellationRequested) {
                try {
                    _manager.ProcessQueue();
                }
                catch (Exception e) {
                    logger.Error(e, 0, "Error RunAsync Worker: " + e);
                }
            }
            await Task.Delay(1000, cancellationToken);

        }
    }
}

而ProcessQueue执行:

And the implementation of the ProcessQueue:

  public void ProcessQueue() {
            try {

                _queue.FetchAttributes();

                int? cachedMessageCount = _queue.ApproximateMessageCount;

                if (cachedMessageCount != null && cachedMessageCount > 0) {

                    var listEntries = new List<CloudQueueMessage>();

                    listEntries.AddRange(_queue.GetMessages(MAX_ENTRIES));

                    Parallel.ForEach(listEntries, ProcessEntry);
                }
            }
            catch (Exception e) {
                logger.Error(e, 0, "Error ProcessQueue: " + e);
            }
}

和ProcessEntry

And ProcessEntry

    private void ProcessEntry(CloudQueueMessage entry) {
        try {
            int id = Convert.ToInt32(entry.AsString);

            Service.GetData(id);

            _queue.DeleteMessage(entry);

        }
        catch (Exception e) {
            _queueError.AddMessage(entry);
            _queue.DeleteMessage(entry);
            logger.Error(e, 0, "Error ProcessEntry: " + e);
        }
    }

在ProcessQueue功能,我尝试用MAX_ENTRIES值不同:第一= 20,然后= 2。
这似乎是与MAX_ENTRIES = 20慢,但MAX_ENTRIES的任何值,它似乎很慢。

In the ProcessQueue function, I try with different values of MAX_ENTRIES: first =20 and then =2. It seems to be slower with MAX_ENTRIES=20, but whatever the value of MAX_ENTRIES is, it seems quite slow.

我的VM是A2网上平台。

My VM is a A2 medium.

我真的不知道,如果我做正确的并行化;也许问题来自于工人本身(其可以是它是很难有此并行地)。

I really don't know if I do the parallelization correctly ; maybe the problem comes from the worker itself (which may be it is hard to have this in parallel).

推荐答案

您还没有提到你正在使用的Azure的消息队列技术,但是,对于在这里我要并行处理多个任务的消息我倾向于使用消息泵模式在服务总线队列和订阅,借力onMessage()方法上都可用服务总线队列和认购的客户:

You haven't mentioned which Azure Messaging Queuing technology you are using, however for tasks where I want to process multiple messages in parallel I tend to use the Message Pump Pattern on Service Bus Queues and Subscriptions, leveraging the OnMessage() method available on both Service Bus Queue and Subscription Clients:


  • QueueClient的onMessage() - <一个href=\"https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.onmessage.aspx\" rel=\"nofollow\">https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.onmessage.aspx

  • SubscriptionClient的onMessage() - <一个href=\"https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.subscriptionclient.onmessage.aspx\" rel=\"nofollow\">https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.subscriptionclient.onmessage.aspx

  • 这东西是如何工作:-)概述 - <一个href=\"http://fabriccontroller.net/blog/posts/introducing-the-event-driven-message-programming-model-for-the-windows-azure-service-bus/\" rel=\"nofollow\">http://fabriccontroller.net/blog/posts/introducing-the-event-driven-message-programming-model-for-the-windows-azure-service-bus/

从MSDN:

在调用的onMessage(),客户端启动内部消息泵
  这将不断轮询队列或订阅。此消息泵
  由一个无限循环,发出一个接收()的调用。如果呼叫
  退房时间,它会发出下一个接收()调用。

When calling OnMessage(), the client starts an internal message pump that constantly polls the queue or subscription. This message pump consists of an infinite loop that issues a Receive() call. If the call times out, it issues the next Receive() call.

此模式可让您使用委托(或匿名函数在我的preferred情况)处理收到经纪公司的消息实例中的在一个单独的线程的上WaWorkerHost过程。事实上,为了增加吞吐量的水​​平,你可以指定消息泵应提供的线程数,从而使您能够从队列中并行接收和处理2,4,8的消息。你还可以告诉消息泵,自动将邮件标记为已完成后,委托已成功完成处理消息。无论是线程数和自动完成指令在通过的 OnMessageOptions 的参数的重载方法。

This pattern allows you to use a delegate (or anonymous function in my preferred case) that handles the receipt of the Brokered Message instance on a separate thread on the WaWorkerHost process. In fact, to increase the level of throughput, you can specify the number of threads that the Message Pump should provide, thereby allowing you to receive and process 2, 4, 8 messages from the queue in parallel. You can additionally tell the Message Pump to automagically mark the message as complete when the delegate has successfully finished processing the message. Both the thread count and AutoComplete instructions are passed in the OnMessageOptions parameter on the overloaded method.

public override void Run()
{
    var onMessageOptions = new OnMessageOptions()
    {
        AutoComplete = true, // Message-Pump will call Complete on messages after the callback has completed processing.
        MaxConcurrentCalls = 2 // Max number of threads the Message-Pump can spawn to process messages.
    };

    sbQueueClient.OnMessage((brokeredMessage) =>
    {

        // Process the Brokered Message Instance here

    }, onMessageOptions);

    RunAsync(_cancellationTokenSource.Token).Wait();
}

您还可以利用RunAsync()方法,如果所需的主要角色工作者线程上执行其他任务。

You can still leverage the RunAsync() method to perform additional tasks on the main Worker Role thread if required.

最后,我也建议你看看你的扩展工作者角色实例到最低2(用于容错和冗余),以提高整体的吞吐量。从我对这种模式的多个生产部署看出,的onMessage()执行完美,当多个工作者角色实例正在运行。

Finally, I would also recommend that you look at scaling your Worker Role instances out to a minimum of 2 (for fault tolerance and redundancy) to increase your overall throughput. From what I have seen with multiple production deployments of this pattern, OnMessage() performs perfectly when multiple Worker Role Instances are running.

这篇关于如何并行蔚蓝工作者角色?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆