流程队列多线程或任务 [英] Process queue with multithreading or tasks

查看:130
本文介绍了流程队列多线程或任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有其中有许多许多消息成为processed.Because电话端口限定一个电话消息应用程序,所以该消息将先进先出处理。每个消息有一个标志确认,表示任何处理。它被初始化为假的,当然。

我希望把所有的消息到队列中,然后用多线程或者任务的处理。

 公共类的MessageQueue
    {
        公共队列MessageWorkItem {搞定;组; }
        公开消息消息{搞定;组; }
        公众的MessageQueue()
        {
            MessageWorkItem =新队列();
            消息=新消息();
        }
        公共无效GetMessageMetaData()
        {
            尝试
            {
                //这只是一个测试,只添加一个项目到队列
                Message.MessageID = Guid.NewGuid();
                Message.NumberToCall =1111111111;
                Message.FacilityID =3333;
                Message.NumberToDial =2222222222;
                Message.Country code =1;
                Message.Acknowledge = FALSE;
            }
            赶上(异常前)
            {
            }
        }        公共无效AddingItemToQueue()
        {
            GetMessageMetaData();
            如果(!Message.Acknowledge)
            {
                锁定(MessageWorkItem)
                {
                    MessageWorkItem.Enqueue(消息);
                }
            }
        }
    }    公共类信息
    {
        公众的Guid的MessageID {搞定;组; }
        公共字符串NumberToCall {搞定;组; }
        公共字符串FacilityID {搞定;组; }
        公共字符串NumberToDial {搞定;组; }
        公共字符串国家code {搞定;组; }
        公共BOOL确认{搞定;组; }
    }

现在我的问题是如何从队列中的多线程出队的项目。
对于从队列中的每一个项目,我要运行脚本。

 公共无效了RunScript(信息项)
        {
            尝试
            {
                PlayMessage(项目);
                返回;
            }
            赶上(HangupException十六进制)
            {
                Log.WriteWithId(呼叫者Hungup!,hex.Message);
            }
            赶上(异常前)
            {
                Log.WriteException(例如,意外的异常:{0});
            }
        }

我的想法是,看看


  

如果(MessageWorkItem.Count> = 1)
  然后做一些事情,但我确实需要code的帮助。



解决方案

如果你可以使用.NET 4.5,我建议在看的从数据流的任务并行库(TPL)

这网页导致了许多实例演练等的如何:实现一个生产者 - 消费者模式的数据流和演练:在Windows中使用数据流窗体应用程序

有一个在该文档,看它是否会帮助你。这是相当多的采取,但我认为这很可能是你的最好的办法。

另外,你可以考虑使用 BlockingCollection 连同其 GetConsumingEnumerable()方法来访问队列中的项目。

你做的是对工作分成要以某种方式来处理对象,并使用BlockingCollection来管理队列。

使用整数部分样品code ,而不是作为对象的工作项目将有助于证明这一点:

当一个工作线程完成了它目前的项目,它会删除从工作队列中一个新的项目,流程,项目,然后将其添加到输出队列。

一个独立的消费者线程将删除完成了从输出队列项目,做了他们。

最后,我们必须等待所有的工人来完成(Task.WaitAll(工人))之前,我们可以标记输出队列为已完成(outputQueue.CompleteAdding())。

 使用系统;
使用System.Collections.Concurrent;
使用的System.Threading;
使用System.Threading.Tasks;命名空间演示
{
    类节目
    {
        静态无效的主要(字串[] args)
        {
            新计划()的run()。
        }        无效的run()
        {
            INT THREADCOUNT = 4;
            任务[] =工人工作新[THREADCOUNT]            Task.Factory.StartNew(消费者);            的for(int i = 0; I< THREADCOUNT ++ I)
            {
                INT workerId = I;
                任务task =新建任务(()=>工人(workerId));
                工人[I] =任务;
                task.Start();
            }            的for(int i = 0; I< 100; ++ I)
            {
                Console.WriteLine(排队工作项{0},我);
                inputQueue.Add(ⅰ);
                Thread.sleep代码(50);
            }            Console.WriteLine(停止增加。);
            inputQueue.CompleteAdding();
            Task.WaitAll(工人);
            outputQueue.CompleteAdding();
            Console.WriteLine(完成。);            到Console.ReadLine();
        }        无效工人(INT workerId)
        {
            Console.WriteLine(工人{0}正在启动。workerId);            的foreach(在inputQueue.GetConsumingEnumerable变种的工作项目())
            {
                Console.WriteLine(工人{0}正在处理项目{1},workerId,工作项目);
                Thread.sleep代码(100); //模拟工作。
                outputQueue.Add(工作项目); //输出完成项目。
            }            Console.WriteLine(工人{0}正在停止。workerId);
        }        无效消费()
        {
            Console.WriteLine(消费者正在启动。);            的foreach(在outputQueue.GetConsumingEnumerable变种的工作项目())
            {
                Console.WriteLine(消费者正在使用的项目{0},工作项目);
                Thread.sleep代码(25);
            }            Console.WriteLine(消费者就完成了。);
        }        BlockingCollection< INT> inputQueue =新BlockingCollection<&诠释GT;();
        BlockingCollection< INT> outputQueue =新BlockingCollection<&诠释GT;();
    }
}

I have a telephony message application in which there are many many messages to be processed.Because telephone ports are limited, so the message will be processed first in first out. Each message has a flag 'Acknowledge' that indicates whichever is processed. It was initialized as false of course.

I want to put all messages into a queue then process them with multiple threads or tasks.

    public class MessageQueue
    {
        public Queue MessageWorkItem { get; set; }
        public Messages Message { get; set; }
        public MessageQueue()
        {
            MessageWorkItem = new Queue();
            Message = new Messages();
        }
        public void GetMessageMetaData()
        {
            try
            {
                // It is just a test, add only one item into the queue
                Message.MessageID = Guid.NewGuid();
                Message.NumberToCall = "1111111111";
                Message.FacilityID = "3333";
                Message.NumberToDial = "2222222222";
                Message.CountryCode = "1";
                Message.Acknowledge = false;
            }
            catch (Exception ex)
            {
            }
        }

        public void AddingItemToQueue()
        {
            GetMessageMetaData();
            if (!Message.Acknowledge)
            {
                lock (MessageWorkItem)
                {
                    MessageWorkItem.Enqueue(Message);
                }
            }
        }
    }

    public class Messages
    {
        public Guid MessageID { get; set; }
        public string NumberToCall { get; set; }
        public string FacilityID { get; set; }
        public string NumberToDial { get; set; }
        public string CountryCode { get; set; }
        public bool Acknowledge { get; set; }
    }

Now my question is how to dequeue the item from the queue with multithreading. For the each item from the queue, I want to run a script.

        public void RunScript(Message item)
        {
            try
            {
                PlayMessage(item); 
                return;
            }
            catch (HangupException hex)
            {
                Log.WriteWithId("Caller Hungup!", hex.Message);
            }
            catch (Exception ex)
            {
                Log.WriteException(ex, "Unexpected exception: {0}");
            }
        }

What I thought was to see if

if(MessageWorkItem.Count >= 1) Then doing something but I do need code help.

解决方案

If you can use .Net 4.5, I'd suggest looking at Dataflow from the the Task Parallel Library (TPL).

That page leads to a lot of example walkthroughs such as How to: Implement a Producer-Consumer Dataflow Pattern and Walkthrough: Using Dataflow in a Windows Forms Application.

Have a look at that documentation to see if it would help you. It's quite a lot to take in, but I think it would probably be your best approach.

Alternatively, you could look into using a BlockingCollection along with its GetConsumingEnumerable() method to access items in the queue.

What you do is to split up the work into objects that you want to process in some way, and use a BlockingCollection to manage the queue.

Some sample code using ints rather than objects as the work items will help to demonstrate this:

When a worker thread has finished with it's current item, it will remove a new item from the work queue, process that item, then add it to the output queue.

A separate consumer thread removes completed items from the output queue and does something with them.

At the end we must wait for all the workers to finish (Task.WaitAll(workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding()).

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().run();
        }

        void run()
        {
            int threadCount = 4;
            Task[] workers = new Task[threadCount];

            Task.Factory.StartNew(consumer);

            for (int i = 0; i < threadCount; ++i)
            {
                int workerId = i;
                Task task = new Task(() => worker(workerId));
                workers[i] = task;
                task.Start();
            }

            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queueing work item {0}", i);
                inputQueue.Add(i);
                Thread.Sleep(50);
            }

            Console.WriteLine("Stopping adding.");
            inputQueue.CompleteAdding();
            Task.WaitAll(workers);
            outputQueue.CompleteAdding();
            Console.WriteLine("Done.");

            Console.ReadLine();
        }

        void worker(int workerId)
        {
            Console.WriteLine("Worker {0} is starting.", workerId);

            foreach (var workItem in inputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
                Thread.Sleep(100);          // Simulate work.
                outputQueue.Add(workItem);  // Output completed item.
            }

            Console.WriteLine("Worker {0} is stopping.", workerId);
        }

        void consumer()
        {
            Console.WriteLine("Consumer is starting.");

            foreach (var workItem in outputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumer is using item {0}", workItem);
                Thread.Sleep(25);
            }

            Console.WriteLine("Consumer is finished.");
        }

        BlockingCollection<int> inputQueue = new BlockingCollection<int>();
        BlockingCollection<int> outputQueue = new BlockingCollection<int>();
    }
}

这篇关于流程队列多线程或任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆