具有多线程的ConcurrentQueue [英] ConcurrentQueue with multithreading

查看:121
本文介绍了具有多线程的ConcurrentQueue的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是多线程概念的新手.我需要将一定数量的字符串添加到队列中,并使用多个线程处理它们.使用ConcurrentQueue这是线程安全的.

I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue which is thread safe.

这是我尝试过的.但是不会处理添加到并发队列中的所有项目.仅处理前4个项目.

This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        string op;
        if(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }


}

推荐答案

您的实现存在两个问题.第一个显而易见的是worker方法仅使零或一个项目出队,然后停止:

There are a couple of issues with your implementation. The first and obvious one is that the worker method only dequeues zero or one item and then stops:

    if(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

应该是:

    while(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

但这不足以使您的程序正常运行.如果您的工作人员出队的速度快于主线程入队的速度,则他们将在主任务仍在排队时停止.您需要通知工人他们可以停车.您可以定义一个布尔变量,完成入队后将其设置为true:

That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true once enqueueing is done:

for (int i = 0; i < 100; i++)
{
    iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

工人将检查该值:

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    do {
        string op;
        while(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }
        SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
    }
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
    Console.WriteLine("Worker {0} is stopping.", workerId);
}  

这篇关于具有多线程的ConcurrentQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆