使用取消令牌停止 Parallel.ForEach 并停止 [英] Stopping Parallel.ForEach with Cancellation Token and Stop

查看:112
本文介绍了使用取消令牌停止 Parallel.ForEach 并停止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不确定是否按照我的意图停止了 Parallel.ForEach 循环.所以让我概述一下问题.

循环使用可用连接有限的数据库驱动程序,并且需要跟踪打开的连接,因此数据库驱动程序不会抛出异常.问题是跟踪打开的连接是手动实现的(这应该重构 - 编写包装器或使用 AutoResetEvent 但还有一些其他事情需要先处理).所以我需要跟踪打开的连接,尤其是我必须处理异常的情况:

Parallel.ForEach(hugeLists, parallelOptions, currentList => {WaitForDatabaseConnection();尝试 {Interlocked.Increment(ref numOfOpenConnections);DoDatabaseCallAndInsertions();} 捕捉(异常前){//记录扔;} 最后 {Interlocked.Decrement(ref numOfOpenConnections);}}

这是没有取消的循环的简化版本.为了提高发生异常时的性能,当抛出异常时,应尽快取消循环.如果一件事失败,循环应该停止.

我如何才能确保 numOfOpenConnections 正确更新?

到目前为止我所尝试的(这是我想要的还是我遗漏了什么?):

Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {parallelOptions.CancellationToken.ThrowIfCancellationRequested();WaitForDatabaseConnection();尝试 {Interlocked.Increment(ref numOfOpenConnections);DoDatabaseCallAndInsertions();} 捕捉(异常前){//记录取消TokenSource.Cancel();parallelLoopState.Stop();扔;//还是想保留原来的异常信息} 最后 {Interlocked.Decrement(ref numOfOpenConnections);}}

我可以将这段代码包装在 try - catch 结构中并捕获 AggregateException.

解决方案

您可以调用 DoDatabaseCallAndInsertions 方法,该方法仅在循环状态不是 exceptional,否则忘记它并允许并行循环立即完成.使用可取消的包装器可能是实现这一目标的最简单方法.这是一个等待函数完成或 CancellationToken 被取消的方法 RunAsCancelable,无论先发生什么:

public static TResult RunAsCancelable(Func函数,CancellationToken 令牌){token.ThrowIfCancellationRequested();任务<TResult>任务 = Task.Run(function, token);尝试{//等待函数完成,或者令牌被取消任务.等待(令牌);}catch { }//防止抛出 AggregateExceptiontoken.ThrowIfCancellationRequested();//传播结果,或者原始异常解包返回 task.GetAwaiter().GetResult();}public static void RunAsCancelable(Action action, CancellationToken token)=>RunAsCancelable(() => { action(); return null; }, token);

RunAsCancelable 方法会抛出一个 OperationCanceledException,以防令牌在 action 完成之前被取消,或者传播发生在action,或者如果 action 成功完成,则成功完成.

使用示例:

using (var failureCTS = new CancellationTokenSource())//沟通失败{Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>{WaitForDatabaseConnection();尝试{Interlocked.Increment(ref numOfOpenConnections);RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),失败CTS.Token);}catch (OperationCanceledException ex)当(例如.CancellationToken == failureCTS.Token){//什么都不做(在另一个线程中发生异常)}捕获(异常前){Log.Error(ex);失败CTS.Cancel();//向其他线程发出失败信号扔;//通知并行循环发生了错误}最后{Interlocked.Decrement(ref numOfOpenConnections);}});}

DoDatabaseCallAndInsertions 方法可以在各个点检查CancellationToken 参数的属性IsCancellationRequested,并在需要时执行事务回滚.>

需要注意的是,RunAsCancelable 方法对于ThreadPool 线程的使用非常浪费.必须阻塞一个额外的线程,以使每个提供的操作都可以取消,因此每次执行 lambda 都需要两个线程.为了防止 ThreadPool 可能出现饥饿,在切换到每 500 毫秒创建一个算法之前,最好增加线程池按需创建的最小线程数,通过使用 ThreadPool.SetMinThreads 应用程序启动时的方法.

ThreadPool.SetMinThreads(100, 10);

<小时>

重要提示:上述解决方案并未尝试记录可能被遗忘的操作的异常情况.只会记录第一个失败操作的异常.

I'm not sure if I'm stopping a Parallel.ForEach loop as I intend to do. So let me outline the problem.

The loop uses a database driver with limited available connections and it is required to keep track of the open connections, so the database driver doesn't throw an exception. The issue is that keeping track of open connections has been implemented manually (this should be refactored - writing a wrapper or using AutoResetEvent but there are some other things that need to be taken care of first). So I need to keep track of the open connections and especially I have to handle the case of an exception:

Parallel.ForEach(hugeLists, parallelOptions, currentList => {
  WaitForDatabaseConnection();
  try {
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     throw;
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

This is the simplified version of the loop without cancellation. To improve the performance in case of an Exception the loop should be cancelled as soon as possible when an Exception is thrown. If one thing fails the loop should stop.

How can I achieve that making sure that numOfOpenConnections is being updated correctly?

What I have tried so far (is this behaving like I want it to or am I missing something?):

Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
  parallelOptions.CancellationToken.ThrowIfCancellationRequested();
  WaitForDatabaseConnection();
  try {     
     Interlocked.Increment(ref numOfOpenConnections);  
     DoDatabaseCallAndInsertions();
  } catch (Exception ex) {
     // logging
     cancellationTokenSource.Cancel();  
     parallelLoopState.Stop();
     throw; // still want to preserve the original exception information
  } finally {
     Interlocked.Decrement(ref numOfOpenConnections);
  } 
}

I could wrap this code in a try - catch construct and catch AggregateException.

解决方案

You could call the DoDatabaseCallAndInsertions method in a way that waits for its completion only while the state of the loop is not exceptional, and otherwise forgets about it and allows the parallel loop to complete immediately. Using a cancelable wrapper is probably the simplest way to achieve this. Here is a method RunAsCancelable that waits for a function to complete, or a CancellationToken to become canceled, whatever comes first:

public static TResult RunAsCancelable<TResult>(Func<TResult> function,
    CancellationToken token)
{
    token.ThrowIfCancellationRequested();
    Task<TResult> task = Task.Run(function, token);
    try
    {
        // Wait for the function to complete, or the token to become canceled
        task.Wait(token);
    }
    catch { } // Prevent an AggregateException to be thrown

    token.ThrowIfCancellationRequested();
    // Propagate the result, or the original exception unwrapped
    return task.GetAwaiter().GetResult();
}

public static void RunAsCancelable(Action action, CancellationToken token)
    => RunAsCancelable<object>(() => { action(); return null; }, token);

The RunAsCancelable method throws an OperationCanceledException in case the token was canceled before the completion of the action, or propagates the exception occurred in the action, or completes successfully if the action completed successfully.

Usage example:

using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
    Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
    {
        WaitForDatabaseConnection();
        try
        {
            Interlocked.Increment(ref numOfOpenConnections);
            RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
                failureCTS.Token);
        }
        catch (OperationCanceledException ex)
            when (ex.CancellationToken == failureCTS.Token)
        {
            // Do nothing (an exception occurred in another thread)
        }
        catch (Exception ex)
        {
            Log.Error(ex);
            failureCTS.Cancel(); // Signal failure to the other threads
            throw; // Inform the parallel loop that an error has occurred
        }
        finally
        {
            Interlocked.Decrement(ref numOfOpenConnections);
        }
    });
}

The DoDatabaseCallAndInsertions method can inspect the property IsCancellationRequested of the CancellationToken parameter at various points, and perform a transaction rollback if needed.

It should be noted that the RunAsCancelable method is quite wasteful regarding the usage of ThreadPool threads. One extra thread must be blocked in order to make each supplied action cancelable, so two threads are needed for each execution of the lambda. To prevent a possible starvation of the ThreadPool, it is probably a good idea to increase the minimum number of threads that the thread pool creates on demand before switching to the create-one-every-500-msec algorithm, by using the ThreadPool.SetMinThreads method at the startup of the application.

ThreadPool.SetMinThreads(100, 10);


Important: The above solution makes no attempt to log the possible exceptions of the operations that have been forgotten. Only the exception of the first failed operation will be logged.

这篇关于使用取消令牌停止 Parallel.ForEach 并停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆