如何以(线程)安全的方式跟踪TPL管道中的故障项 [英] How to keep track of faulted items in TPL pipeline in (thread)safe way

查看:55
本文介绍了如何以(线程)安全的方式跟踪TPL管道中的故障项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将TPL管道设计与Stephen Cleary的 Try库一起使用简而言之,它包装了价值/例外并将其浮动到管道中.因此,即使在处理方法中引发异常的项目,最后当我等待resultBlock.Completion; 时,其状态代码也仍然为 Status = RunToCompletion .因此,我需要其他方式来注册有缺陷的项目.这是小样本:

I am using TPL pipeline design together with Stephen Cleary's Try library In short it wraps value/exception and floats it down the pipeline. So even items that have thrown exceptions inside their processing methods, at the end when I await resultsBlock.Completion; have Status=RunToCompletion. So I need other way how to register faulted items. Here is small sample:

var downloadBlock = new TransformBlock<int, Try<int>>(construct => Try.Create(() =>
{
    //SomeProcessingMethod();
    return 1;
}));
var processBlock = new TransformBlock<Try<int>, Try<int>>(construct => construct.Map(value =>
{
    //SomeProcessingMethod();
    return 1;
}));
var resultsBlock = new ActionBlock<Try<int>>(construct =>
{
    if (construct.IsException)
    {
        var exception = construct.Exception;
        switch (exception)
        {
            case GoogleApiException gex:
                //_notificationService.NotifyUser("OMG, my dear sir, I think I messed something up:/"
                //Register that this item was faulted, so we know that we need to retry it.
                break;
            default:
                break;
        }
    }
});

一种解决方案是创建 List< int>FaultedItems; ,在其中将所有错误项插入到 Exception 处理块中,然后在 await resultsBlock.Completion; 之后,我可以检查列表是否不为空,并且为故障物料创建新管道.我的问题是,如果我决定使用 MaxDegreeOfParallelism 设置,并且我会更好,我是否使用 List< int> 是否有遇到线程安全问题的风险?使用一些 ConcurrentCollection 关闭?还是这种方法在其他方面存在缺陷?

One solution would be to create a List<int> FaultedItems; where I would insert all faulted items in my Exception handling block and then after await resultsBlock.Completion; I could check if the list is not empty and create new pipeline for faulted items. My question is if I use a List<int> am I at risk of running into problems with thread safety if I decide to play with MaxDegreeOfParallelism settings and I'd be better off using some ConcurrentCollection? Or maybe this approach is flawed in some other way?

推荐答案

我将重试阻止实现从答案转换为 Try 类型作为输入和输出.方法 CreateRetryTransformBlock 返回 TransformBlock< Try< TInput>,Try< TOutput>> ,方法 CreateRetryActionBlock 返回的内容实际上是 ActionBlock< Try< TInput>> .

I converted a retry-block implementation from an answer to a similar question, to work with Stephen Cleary's Try types as input and output. The method CreateRetryTransformBlock returns a TransformBlock<Try<TInput>, Try<TOutput>>, and the method CreateRetryActionBlock returns something that is practically an ActionBlock<Try<TInput>>.

在标准

Three more options are available, the MaxAttemptsPerItem, MinimumRetryDelay and MaxRetriesTotal, on top of the standard execution options.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The minimum delay duration before retrying an item.</summary>
    public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static TransformBlock<Try<TInput>, Try<TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MinimumRetryDelay));

    var internalCTS = CancellationTokenSource
        .CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var exceptionsCount = 0;
    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
    {
        // Continue on captured context after every await
        if (item.IsException) return Try<TOutput>.FromException(item.Exception);
        var result1 = await ProcessOnceAsync(item);
        if (item.IsException || result1.IsValue) return result1;
        for (int i = 2; i <= maxAttemptsPerItem; i++)
        {
            await Task.Delay(retryDelay, internalCTS.Token);
            var result = await ProcessOnceAsync(item);
            if (result.IsValue) return result;
        }
        return result1; // Return the first-attempt exception
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
        TaskScheduler.Default);

    return block;

    async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
    {
        await semaphore.WaitAsync(internalCTS.Token);
        try
        {
            var result = await item.Map(transform);
            if (item.IsValue && result.IsException)
            {
                ObserveNewException(result.Exception);
            }
            return result;
        }
        finally
        {
            semaphore.Release();
        }
    }

    void ObserveNewException(Exception ex)
    {
        if (maxRetriesTotal == -1) return;
        uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
        if (newCount <= (uint)maxRetriesTotal) return;
        if (newCount == (uint)maxRetriesTotal + 1)
        {
            internalCTS.Cancel(); // The block has failed
            throw new RetryLimitException($"The max retry limit " +
                $"({maxRetriesTotal}) has been reached.", ex);
        }
        throw new OperationCanceledException();
    }
}

public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<Try<object>>();
    block.LinkTo(nullTarget);
    return block;
}

用法示例:

var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
{
    int result = await DownloadAsync(construct);
    return result;
}, new RetryExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10,
    MaxAttemptsPerItem = 3,
    MaxRetriesTotal = 100,
    MinimumRetryDelay = TimeSpan.FromSeconds(10)
});

var processBlock = new TransformBlock<Try<int>, Try<int>>(
    construct => construct.Map(async value =>
{
    return await ProcessAsync(value);
}));

downloadBlock.LinkTo(processBlock,
    new DataflowLinkOptions() { PropagateCompletion = true });

为简单起见,如果某项已被重试了最大次数,则保留的异常是第一个发生的异常.随后的异常丢失.在大多数情况下,丢失的异常与第一个异常的类型相同.

To keep things simple, in case that an item has been retried the maximum number of times, the exception preserved is the first one that occurred. The subsequent exceptions are lost. In most cases the lost exceptions are going to be of the same type as the first one anyway.

这篇关于如何以(线程)安全的方式跟踪TPL管道中的故障项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆