在迭代方法调用的结果时,Parallel.Foreach 将如何表现? [英] How will Parallel.Foreach behave when Iterating over the results of a method call?

查看:22
本文介绍了在迭代方法调用的结果时,Parallel.Foreach 将如何表现?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

范围:

我目前正在实施一个应用程序,该应用程序使用 Amazon SQS 服务 作为该程序的数据提供者处理.

I am currently implementing an application that uses Amazon SQS Service as a provider of data for this program to process.

由于我需要对从此队列中出队的消息进行并行处理,这就是我所做的.

Since I need a parallel processing over the messages dequeued from this queue, this is what I've did.

Parallel.ForEach (GetMessages (msgAttributes), new ParallelOptions { MaxDegreeOfParallelism = threadCount }, message =>
        {
             // Processing Logic
        });

这是GetMessages"方法的标题:

Here's the header of the "GetMessages" method:

    private static IEnumerable<Message> GetMessages (List<String> messageAttributes = null)
    {
        // Dequeueing Logic... 10 At a Time

        // Yielding the messages to the Parallel Loop
        foreach (Message awsMessage in messages)
        {
           yield return awsMessage;
        }
    }

这将如何运作?:

我最初的想法是,只要线程没有工作(或者很多线程没有工作,类似于内部启发式来衡量),就会执行 GetMessages 方法这个).话虽如此,对我来说,GetMessages 方法会将消息分发到 Parallel.For 工作线程,后者将处理消息并等待 Parallel.For handler 为他们提供更多工作信息.

My initial thought about how this would work was that the GetMessagesmethod would be executed whenever the thread's had no work (or a good number of threads had no work, something like an internal heuristic to measure this). That being said, to me, the GetMessages method would than, distribute the messages to the Parallel.For working threads, which would process the messages and wait for the Parallel.For handler to give them more messages to work.

问题?我错了...

问题是,我错了.不过,我不知道在这种情况下发生了什么.

The thing is that, I was wrong. Still, I have no idea on what's happening in this situation.

出队的消息数量太多了,每次出队时都会增加 2 的幂.出队计数(消息)如下:

The number of messages being dequeued is way too high, and it grews by powers of 2 every time they get dequeued. The dequeueing count (messsages) goes as following:

  1. Dequeue 被调用:返回 80 条消息
  2. 出队被调用:返回 160 条消息
  3. Dequeue 被调用:返回 320 条消息(等等)

在某个时间点之后,出队的消息数量,或者在这种情况下,等待我的应用程序处理的消息数量太多,最终导致内存不足.

After a certain point, the number of messages being dequeued, or, in this case, waiting to be processed by my application is too high and I end up running out of memory.

更多信息:

我正在使用线程安全的 InterLocked 操作来增加上面提到的计数器.

I am using thread-safe InterLocked operations to increment counters mentioned above.

正在使用的线程数为 25(对于 Parallel.Foreach)

The number of threads being used is 25 (for the Parallel.Foreach)

每个GetMessages"将返回最多 10 条消息(作为 IEnumerable,产生).

Each "GetMessages" will return up to 10 messages (as an IEnumerable, yielded).

问题:这个场景到底发生了什么?

我很难弄清楚到底发生了什么.一旦完成处理循环",我的 GetMessages 方法是否会被每个线程调用,从而导致越来越多的消息出队?

I am having a hard-time trying to figure out what exactly is going on. Is my GetMessages method being invoked by each thread once it finishes the "Processing Loop", hence, leading to more and more messages being dequeued ?

对GetMessages"的调用是由单个线程调用还是由多个线程调用?

Is the call to the "GetMessages", made by a single thread, or is it being called by multiple threads ?

推荐答案

我认为 Parallel.ForEach 分区存在问题...您的问题是典型的生产者/消费者方案.对于这种情况,您应该在一侧具有独立的出列逻辑,而在另一侧进行处理.它将尊重关注点分离,并将简化调试.

I think there's an issue with Parallel.ForEach partitioning... Your question is a typical producer / consumer scenario. For such a case, you should have independent logics for dequeuing on one side, and processing on the other. It will respect separation of concerns, and will simplify debugging.

BlockingCollection 将让您将两者分开:一方面,您添加要处理的项目,另一方面,您使用它们.这是一个如何实现它的示例:

BlockingCollection<T> will let you to separate boths : on one side, you add items to be processed, and on the other, you consume them. Here's an example of how to implement it :

您将需要 ParallelExtensionsExtras 用于 BlockingCollection 的 nuget 包code> 工作负载分区(.GetConsumingEnumerable() 在 process 方法中).

You will need the ParallelExtensionsExtras nuget package for BlockingCollection<T> workload partitioning (.GetConsumingEnumerable() in the process method).

public static class ProducerConsumer
{
    public static ConcurrentQueue<String> SqsQueue = new ConcurrentQueue<String>();         
    public static BlockingCollection<String> Collection = new BlockingCollection<String>();
    public static ConcurrentBag<String> Result = new ConcurrentBag<String>();

    public static async Task TestMethod()
    {
        // Here we separate all the Tasks in distinct threads
        Task sqs = Task.Run(async () =>
        {
            Console.WriteLine("Amazon on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
            while (true)
            {
                ProducerConsumer.BackgroundFakedAmazon(); // We produce 50 Strings each second
                await Task.Delay(1000);
            }
        });
        Task deq = Task.Run(async () =>
        {
            Console.WriteLine("Dequeue on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
            while (true)
            {
                ProducerConsumer.DequeueData(); // Dequeue 20 Strings each 100ms 
                await Task.Delay(100);
            }
        });

        Task process = Task.Run(() =>
        {
            Console.WriteLine("Process on thread " + Thread.CurrentThread.ManagedThreadId.ToString());
            ProducerConsumer.BackgroundParallelConsumer(); // Process all the Strings in the BlockingCollection
        });

        await Task.WhenAll(c, sqs, deq, process);
    }

    public static void DequeueData()
    {
        foreach (var i in Enumerable.Range(0, 20))
        {
            String dequeued = null;
            if (SqsQueue.TryDequeue(out dequeued))
            {
                Collection.Add(dequeued);
                Console.WriteLine("Dequeued : " + dequeued);
            }
        }
    }

    public static void BackgroundFakedAmazon()
    {
        Console.WriteLine(" ---------- Generate 50 items on amazon side  ---------- ");
        foreach (var data in Enumerable.Range(0, 50).Select(i => Path.GetRandomFileName().Split('.').FirstOrDefault()))
            SqsQueue.Enqueue(data + " / ASQS");
    }

    public static void BackgroundParallelConsumer()
    {
        // Here we stay in Parallel.ForEach, waiting for data. Once processed, we are still waiting the next chunks
        Parallel.ForEach(Collection.GetConsumingEnumerable(), (i) =>
        {
            // Processing Logic
            String processedData = "Processed : " + i;
            Result.Add(processedData);
            Console.WriteLine(processedData);
        });

    }
}

您可以从这样的控制台应用程序中尝试:

You can try it from a console app like this :

static void Main(string[] args)
{
    ProducerConsumer.TestMethod().Wait();
}

这篇关于在迭代方法调用的结果时,Parallel.Foreach 将如何表现?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆