具有多线程的ConcurrentQueue [英] ConcurrentQueue with multithreading
问题描述
我是多线程概念的新手.我需要将一定数量的字符串添加到队列中,并使用多个线程处理它们.使用ConcurrentQueue
这是线程安全的.
I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue
which is thread safe.
这是我尝试过的.但是不会处理添加到并发队列中的所有项目.仅处理前4个项目.
This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.
class Program
{
ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Task.WaitAll(workers);
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
}
推荐答案
您的实现存在两个问题.第一个显而易见的是worker
方法仅使零或一个项目出队,然后停止:
There are a couple of issues with your implementation. The first and obvious one is that the worker
method only dequeues zero or one item and then stops:
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
应该是:
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
但这不足以使您的程序正常运行.如果您的工作人员出队的速度快于主线程入队的速度,则他们将在主任务仍在排队时停止.您需要通知工人他们可以停车.您可以定义一个布尔变量,完成入队后将其设置为true
:
That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true
once enqueueing is done:
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);
工人将检查该值:
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
do {
string op;
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
}
while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
Console.WriteLine("Worker {0} is stopping.", workerId);
}
这篇关于具有多线程的ConcurrentQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!