使用取消令牌停止 Parallel.ForEach 并停止 [英] Stopping Parallel.ForEach with Cancellation Token and Stop
问题描述
我不确定是否按照我的意图停止了 Parallel.ForEach
循环.所以让我概述一下问题.
循环使用可用连接有限的数据库驱动程序,并且需要跟踪打开的连接,因此数据库驱动程序不会抛出异常.问题是跟踪打开的连接是手动实现的(这应该重构 - 编写包装器或使用 AutoResetEvent
但还有一些其他事情需要先处理).所以我需要跟踪打开的连接,尤其是我必须处理异常的情况:
Parallel.ForEach(hugeLists, parallelOptions, currentList => {WaitForDatabaseConnection();尝试 {Interlocked.Increment(ref numOfOpenConnections);DoDatabaseCallAndInsertions();} 捕捉(异常前){//记录扔;} 最后 {Interlocked.Decrement(ref numOfOpenConnections);}}
这是没有取消的循环的简化版本.为了提高发生异常时的性能,当抛出异常时,应尽快取消循环.如果一件事失败,循环应该停止.
我如何才能确保 numOfOpenConnections
正确更新?
到目前为止我所尝试的(这是我想要的还是我遗漏了什么?):
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {parallelOptions.CancellationToken.ThrowIfCancellationRequested();WaitForDatabaseConnection();尝试 {Interlocked.Increment(ref numOfOpenConnections);DoDatabaseCallAndInsertions();} 捕捉(异常前){//记录取消TokenSource.Cancel();parallelLoopState.Stop();扔;//还是想保留原来的异常信息} 最后 {Interlocked.Decrement(ref numOfOpenConnections);}}
我可以将这段代码包装在 try - catch 结构中并捕获 AggregateException
.
您可以调用 DoDatabaseCallAndInsertions
方法,该方法仅在循环状态不是 exceptional,否则忘记它并允许并行循环立即完成.使用可取消的包装器可能是实现这一目标的最简单方法.这是一个等待函数完成或 CancellationToken
被取消的方法 RunAsCancelable
,无论先发生什么:
public static TResult RunAsCancelable(Func函数,CancellationToken 令牌){token.ThrowIfCancellationRequested();任务<TResult>任务 = Task.Run(function, token);尝试{//等待函数完成,或者令牌被取消任务.等待(令牌);}catch { }//防止抛出 AggregateExceptiontoken.ThrowIfCancellationRequested();//传播结果,或者原始异常解包返回 task.GetAwaiter().GetResult();}public static void RunAsCancelable(Action action, CancellationToken token)=>RunAsCancelable
RunAsCancelable
方法会抛出一个 OperationCanceledException
,以防令牌在 action
完成之前被取消,或者传播发生在action
,或者如果 action
成功完成,则成功完成.
使用示例:
using (var failureCTS = new CancellationTokenSource())//沟通失败{Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>{WaitForDatabaseConnection();尝试{Interlocked.Increment(ref numOfOpenConnections);RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),失败CTS.Token);}catch (OperationCanceledException ex)当(例如.CancellationToken == failureCTS.Token){//什么都不做(在另一个线程中发生异常)}捕获(异常前){Log.Error(ex);失败CTS.Cancel();//向其他线程发出失败信号扔;//通知并行循环发生了错误}最后{Interlocked.Decrement(ref numOfOpenConnections);}});}
DoDatabaseCallAndInsertions
方法可以在各个点检查CancellationToken
参数的属性IsCancellationRequested
,并在需要时执行事务回滚.>
需要注意的是,RunAsCancelable
方法对于ThreadPool
线程的使用非常浪费.必须阻塞一个额外的线程,以使每个提供的操作都可以取消,因此每次执行 lambda 都需要两个线程.为了防止 ThreadPool
可能出现饥饿,在切换到每 500 毫秒创建一个算法之前,最好增加线程池按需创建的最小线程数,通过使用 ThreadPool.SetMinThreads
应用程序启动时的方法.
ThreadPool.SetMinThreads(100, 10);
<小时>
重要提示:上述解决方案并未尝试记录可能被遗忘的操作的异常情况.只会记录第一个失败操作的异常.
I'm not sure if I'm stopping a Parallel.ForEach
loop as I intend to do.
So let me outline the problem.
The loop uses a database driver with limited available connections and it is required to keep track of the open connections, so the database driver doesn't throw an exception. The issue is that keeping track of open connections has been implemented manually (this should be refactored - writing a wrapper or using AutoResetEvent
but there are some other things that need to be taken care of first). So I need to keep track of the open connections and especially I have to handle the case of an exception:
Parallel.ForEach(hugeLists, parallelOptions, currentList => {
WaitForDatabaseConnection();
try {
Interlocked.Increment(ref numOfOpenConnections);
DoDatabaseCallAndInsertions();
} catch (Exception ex) {
// logging
throw;
} finally {
Interlocked.Decrement(ref numOfOpenConnections);
}
}
This is the simplified version of the loop without cancellation. To improve the performance in case of an Exception the loop should be cancelled as soon as possible when an Exception is thrown. If one thing fails the loop should stop.
How can I achieve that making sure that numOfOpenConnections
is being updated correctly?
What I have tried so far (is this behaving like I want it to or am I missing something?):
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) => {
parallelOptions.CancellationToken.ThrowIfCancellationRequested();
WaitForDatabaseConnection();
try {
Interlocked.Increment(ref numOfOpenConnections);
DoDatabaseCallAndInsertions();
} catch (Exception ex) {
// logging
cancellationTokenSource.Cancel();
parallelLoopState.Stop();
throw; // still want to preserve the original exception information
} finally {
Interlocked.Decrement(ref numOfOpenConnections);
}
}
I could wrap this code in a try - catch construct and catch AggregateException
.
You could call the DoDatabaseCallAndInsertions
method in a way that waits for its completion only while the state of the loop is not exceptional, and otherwise forgets about it and allows the parallel loop to complete immediately. Using a cancelable wrapper is probably the simplest way to achieve this. Here is a method RunAsCancelable
that waits for a function to complete, or a CancellationToken
to become canceled, whatever comes first:
public static TResult RunAsCancelable<TResult>(Func<TResult> function,
CancellationToken token)
{
token.ThrowIfCancellationRequested();
Task<TResult> task = Task.Run(function, token);
try
{
// Wait for the function to complete, or the token to become canceled
task.Wait(token);
}
catch { } // Prevent an AggregateException to be thrown
token.ThrowIfCancellationRequested();
// Propagate the result, or the original exception unwrapped
return task.GetAwaiter().GetResult();
}
public static void RunAsCancelable(Action action, CancellationToken token)
=> RunAsCancelable<object>(() => { action(); return null; }, token);
The RunAsCancelable
method throws an OperationCanceledException
in case the token was canceled before the completion of the action
, or propagates the exception occurred in the action
, or completes successfully if the action
completed successfully.
Usage example:
using (var failureCTS = new CancellationTokenSource()) // Communicates failure
{
Parallel.ForEach(hugeLists, parallelOptions, (currentList, parallelLoopState) =>
{
WaitForDatabaseConnection();
try
{
Interlocked.Increment(ref numOfOpenConnections);
RunAsCancelable(() => DoDatabaseCallAndInsertions(failureCTS.Token),
failureCTS.Token);
}
catch (OperationCanceledException ex)
when (ex.CancellationToken == failureCTS.Token)
{
// Do nothing (an exception occurred in another thread)
}
catch (Exception ex)
{
Log.Error(ex);
failureCTS.Cancel(); // Signal failure to the other threads
throw; // Inform the parallel loop that an error has occurred
}
finally
{
Interlocked.Decrement(ref numOfOpenConnections);
}
});
}
The DoDatabaseCallAndInsertions
method can inspect the property IsCancellationRequested
of the CancellationToken
parameter at various points, and perform a transaction rollback if needed.
It should be noted that the RunAsCancelable
method is quite wasteful regarding the usage of ThreadPool
threads. One extra thread must be blocked in order to make each supplied action cancelable, so two threads are needed for each execution of the lambda. To prevent a possible starvation of the ThreadPool
, it is probably a good idea to increase the minimum number of threads that the thread pool creates on demand before switching to the create-one-every-500-msec algorithm, by using the ThreadPool.SetMinThreads
method at the startup of the application.
ThreadPool.SetMinThreads(100, 10);
Important: The above solution makes no attempt to log the possible exceptions of the operations that have been forgotten. Only the exception of the first failed operation will be logged.
这篇关于使用取消令牌停止 Parallel.ForEach 并停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!