Parallel.ForEach - 优美取消 [英] Parallel.ForEach - Graceful Cancellation

查看:284
本文介绍了Parallel.ForEach - 优美取消的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在等待这个问题,直到任务完成和线程同步。

On the subject of waiting until tasks are complete and thread synchronisation.

目前,我有一个迭代我有一个Parallel.ForEach括起来。在下面的例子我提出的关于如何最好地处理正常终止循环(.NET 4.0)的意见的一些问题;

I currently have an iteration i have enclosed within a Parallel.ForEach. In the Example below I have posed some questions in the comments about how best to handle the graceful termination of the loop (.NET 4.0);

private void myFunction()
    {

        IList<string> iListOfItems = new List<string>();
        // populate iListOfItems

        CancellationTokenSource cts = new CancellationTokenSource();

        ParallelOptions po = new ParallelOptions();
        po.MaxDegreeOfParallelism = 20; // max threads
        po.CancellationToken = cts.Token;

        try
        {
            var myWcfProxy = new myWcfClientSoapClient();

            if (Parallel.ForEach(iListOfItems, po, (item, loopsate) =>
            {
                try
                {
                    if (_requestedToStop)
                        loopsate.Stop();
                    // long running blocking WS call, check before and after
                    var response = myWcfProxy.ProcessIntervalConfiguration(item);
                    if (_requestedToStop)
                        loopsate.Stop();

                    // perform some local processing of the response object
                }
                catch (Exception ex)
                {
                    // cannot continue game over.
                    if (myWcfProxy.State == CommunicationState.Faulted)
                    {
                        loopsate.Stop();
                        throw;
                    }
                }

                // else carry on..
                // raise some events and other actions that could all risk an unhanded error.

            }
            ).IsCompleted)
            {
                RaiseAllItemsCompleteEvent();
            }
        }
        catch (Exception ex)
        {
            // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the
            // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception?
            // Do I need to call cts.Cancel here?

            // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that?

            // do i need to call cts.Dispose() ?

            MessageBox.Show(Logging.FormatException(ex));
        }
        finally
        {

            if (myWcfProxy != null)
            {
            // possible race condition with the for-each threads here unless we wait for them to terminate.
                if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted)
                    myWcfProxy.Abort();

                myWcfProxy.Close();
            }

            // possible race condition with the for-each threads here unless we wait for them to terminate.
            _requestedToStop = false;

        }

    }

任何帮助将是最AP preciated。 ManualResetEventSlim的和cancellationToken.WaitHandle的的MSDN文档会谈。但不知道如何将它们连接到这一点,似乎是在努力理解MSDN的例子,因为大多数不适用。

Any help would be most appreciated. The MSDN documentation talks of ManualResetEventSlim's and cancellationToken.WaitHandle's. but not sure how to wire them into this, seem to be struggling understanding the MSDN examples as most dont apply.

推荐答案

我还嘲笑了以下一些code可以回答你的问题。最基本的一点是,你得到叉/ join并行与Parallel.ForEach,所以你不必担心并行任务之外的竞争条件(调用线程阻塞,直到任务完成,成功或其他)。你只需要确保使用LoopState变量(第二个参数的lambda)来控制你的循环状态。

I have mocked up some code below that may answer your question. The basic point is that you get fork/join parallelism with Parallel.ForEach, so you don't need to worry about race conditions outside of the parallel task (the calling thread blocks until the tasks have completed, successfully or otherwise). You just want to make sure to use the LoopState variable (the second argument to the lambda) to control your loop state.

如果循环的任何迭代引发了未处理的异常,整个环路将提高捉住了在年底AggregateException。

If any iteration of the loop threw an unhandled exception, the overall loop will raise the AggregateException caught at the end.

这提这个话题其他链接:

Other links that mention this topic:

<一个href="http://stackoverflow.com/questions/3832852/parallel-foreach-throws-exception-when-processing-extremely-large-sets-of-data">Parallel.ForEach处理非常大的数据集时抛出异常

<一个href="http://msdn.microsoft.com/en-us/library/dd460720.aspx">http://msdn.microsoft.com/en-us/library/dd460720.aspx

<一个href="http://stackoverflow.com/questions/1114317/does-parallel-foreach-limits-the-number-of-active-threads">Does Parallel.ForEach限制活动线程的数量?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;

namespace Temp
{
    public class Class1
    {
        private class MockWcfProxy
        {
            internal object ProcessIntervalConfiguration(string item)
            {
                return new Object();
            }

            public CommunicationState State { get; set; }
        }

        private void myFunction()
        {

            IList<string> iListOfItems = new List<string>();
            // populate iListOfItems

            CancellationTokenSource cts = new CancellationTokenSource();

            ParallelOptions po = new ParallelOptions();
            po.MaxDegreeOfParallelism = 20; // max threads
            po.CancellationToken = cts.Token;

            try
            {
                var myWcfProxy = new MockWcfProxy();

                if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
                    {
                        try
                        {
                            if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                                loopState.Stop();

                            // long running blocking WS call, check before and after
                            var response = myWcfProxy.ProcessIntervalConfiguration(item);

                            if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                                loopState.Stop();

                            // perform some local processing of the response object
                        }
                        catch (Exception ex)
                        {
                            // cannot continue game over.
                            if (myWcfProxy.State == CommunicationState.Faulted)
                            {
                                loopState.Stop();
                                throw;
                            }

                            // FYI you are swallowing all other exceptions here...
                        }

                        // else carry on..
                        // raise some events and other actions that could all risk an unhanded error.
                    }
                ).IsCompleted)
                {
                    RaiseAllItemsCompleteEvent();
                }
            }
            catch (AggregateException aggEx)
            {
                // This section will be entered if any of the loops threw an unhandled exception.  
                // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
                // to see those (if you want).
            }
            // Execution will not get to this point until all of the iterations have completed (or one 
            // has failed, and all that were running when that failure occurred complete).
        }

        private void RaiseAllItemsCompleteEvent()
        {
            // Everything completed...
        }
    }
}

这篇关于Parallel.ForEach - 优美取消的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆