大量I/O操作中的Parallel.ForEach与Async Forloop [英] Parallel.ForEach vs Async Forloop in Heavy I/O Ops

查看:99
本文介绍了大量I/O操作中的Parallel.ForEach与Async Forloop的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想比较两种理论情况.为了这个问题,我已经简化了案例.但基本上,这是您典型的生产者消费者方案. (我专注于消费者).

I want to compare two theoretical scenarios. I have simplified the cases for purpose of the question. But basically its your typical producer consumer scenario. (I'm focusing on the consumer).

我有一个很大的Queue<string> dataQueue,我必须将其传输给多个客户端.

I have a large Queue<string> dataQueue that I have to transmit to multiple clients.

因此,让我们从更简单的情况开始:

So lets start with the simpler case:

 class SequentialBlockingCase
 {
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                foreach (var destination in _destinations)
                {
                    SendDataToDestination(destination, data);
                }
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post, instead simulate the send
        Thread.Sleep(200);
    }
}
}

现在,此设置可以正常运行.它坐在那里并轮询Queue,并在有要发送的数据时将其发送到所有目的地.

Now this setup works just fine. It sits there and polls the Queue and when there is data to send it sends it to all the destinations.

问题:

  • 如果其中一个目的地不可用或速度较慢,则会影响所有其他目的地.
  • 在并行执行的情况下,它不使用多线程.
  • 到每个目的地的每次传输都被阻止.

这是我的第二次尝试:

 class ParalleBlockingCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                Parallel.ForEach(_destinations, destination =>
                {
                    SendDataToDestination(destination, data);
                });
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        Thread.Sleep(200);
    }
}

如果1个目的地缓慢或不可用,此修订版至少不会影响其他目的地.

This revision at least does not effect the other destinations if 1 destination is slow or unavailable.

但是,此方法仍然处于阻塞状态,我不确定Parallel.ForEach是否使用了线程池.我的理解是,它将创建X个线程/任务并一次执行4个(4个核心cpu).但是它必须完全完成任务1的芬兰语才能完成.

However this method is still blocking and I am not sure if Parallel.ForEach makes use of the thread pool. My understanding is that it will create X number of threads / tasks and execute 4 (4 core cpu) at a time. But it has to completely Finnish task 1 before task 5 can start.

因此,我的第三个选择:

Hence My 3rd option:

class ParalleAsyncCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string> { };

    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                List<Task> tasks = new List<Task>();
                foreach (var destination in _destinations)
                {
                    var task = SendDataToDestination(destination, data);
                    task.Start();
                    tasks.Add(task);
                }

                //Wait for all tasks to complete
                Task.WaitAll(tasks.ToArray());
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }

    private static async Task SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        await Task.Delay(200);
    }
}

现在,根据我的理解,此选项仍然会在Task.WaitAll(tasks.ToArray());的主线程上阻塞,这是很好的,因为我不希望它在创建任务时比执行任务快时就消失了.

Now from my understanding this option, will still block on the main thread at Task.WaitAll(tasks.ToArray()); which is fine because I don't want it to run away with creating tasks faster than they can be executed.

但是将并行执行的任务应使用ThreadPool,并且所有X个任务应立即开始执行,而不是阻塞或按顺序执行. (线程池将在它们变为活动状态或为awaiting时在它们之间进行交换)

But the tasks that will be execute in parallel should make use of the ThreadPool, and all X number of tasks should start executing at once, not blocking or in sequential order. (thread pool will swap between them as they become active or are awaiting)

现在我的问题.

选项3与选项2相比,在性能上有任何好处吗?

Does option 3 have any performance benefit over option 2.

特别是在高性能服务器端方案中.在我正在使用的特定软件中.上面有我的简单用例的多个实例.即几个消费者.

Specifically in a higher performance server side scenario. In the specific software I am working on now. There would be multiple instanced of my simple use case above. Ie several consumers.

我对两种解决方案的理论差异以及优点和缺点都感兴趣,如果有一个选项,也许还有更好的第四种选择.

I'm interested in the theoretical differences and pro's vs cons of the two solutions, and maybe even a better 4th option if there is one.

推荐答案

Parallel.ForEach将使用线程池.异步代码不会,因为它不需要任何线程完全(链接到我的博客).

Parallel.ForEach will use the thread pool. Asynchronous code will not, since it doesn't need any threads at all (link is to my blog).

正如Mrinal所指出的,如果您有CPU约束的代码,那么并行性是合适的;如果您有I/O绑定代码,则异步是合适的.在这种情况下,HTTP POST显然是I/O,因此理想的使用代码将是异步的.

As Mrinal pointed out, if you have CPU-bound code, parallelism is appropriate; if you have I/O-bound code, asynchrony is appropriate. In this case, an HTTP POST is clearly I/O, so the ideal consuming code would be asynchronous.

如果有第四个选项,甚至更好.

maybe even a better 4th option if there is one.

我建议使您的使用者完全异步.为此,您需要使用异步兼容的生产者/消费者队列.有一个相当高级的()在TPL数据流库中,以及一个相当简单的

I would recommend making your consumer fully asynchronous. In order to do so, you'll need to use an async-compatible producer/consumer queue. There's a fairly advanced one (BufferBlock<T>) in the TPL Dataflow library, and a fairly simple one (AsyncProducerConsumerQueue<T>) in my AsyncEx library.

使用它们中的任何一个,您都可以创建一个完全异步的使用者:

With either of them, you can create a fully asynchronous consumer:

List<Task> tasks = new List<Task>();
foreach (var destination in _destinations)
{
  var task = SendDataToDestination(destination, data);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

或更简单地说:

var tasks = _destinations
    .Select(destination => SendDataToDestination(destination, data));
await Task.WhenAll(tasks);

这篇关于大量I/O操作中的Parallel.ForEach与Async Forloop的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆