具有延迟的 TPL 数据流队列 [英] TPL DataFlow Queue with Postponement

查看:85
本文介绍了具有延迟的 TPL 数据流队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 ActionBlock 同时处理一个队列.

I am processing a queue concurrently using an ActionBlock.

这里的一个问题是,在处理队列中的一项时,我可能想等到处理队列中的另一项满足依赖关系.

The one catch here is that when processing an item in the queue, I may want to wait until a dependency is satisfied by the processing of another item in the queue.

我认为我应该能够使用 TPL DataFlow 库来实现这一点,其中包含链接、延迟和延迟释放,但我不确定要使用什么结构.

I think I should be able to do this with the TPL DataFlow library with linking, postponement and release of postponement but I'm not sure what constructs to use.

在伪代码中:

public class Item 
{
    public string Name { get; set; }
    public List<string> DependsOn = new List<string>();
}

ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => {
    if (!HasActionBlockProcessedAllDependencies(o.DependsOn)) 
    {
       // enqueue a callback when ALL dependencies have been completed
    } 
    else 
    {
        DoWork(o);
    }
},
new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
});

var items = new[] 
{
    new Item { Name = "Apple", DependsOn = { "Pear" } },
    new Item { Name = "Pear" }
}

推荐答案

我不确定这是否对您有帮助,但这里有一个自定义的 DependencyTransformBlock 类,它知道它接收的项目,只有在成功处理其依赖项后才处理每个项目.此自定义块支持普通 TransformBlockEnsureOrdered 选项.

I am not sure if this will be helpful to you, but here is a custom DependencyTransformBlock class that knows about the dependencies between the items it receives, and processes each one only after its dependencies have been successfully processed. This custom block supports all the built-in functionality of a normal TransformBlock, except from the EnsureOrdered option.

该类的构造函数接受一个 Func lambda 来检索每个项目的键,以及一个 Func> 用于检索其依赖项的 lambda.键应该是唯一的.如果发现重复键,则块将失败完成.

The constructors of this class accept a Func<TInput, TKey> lambda for retrieving the key of each item, and a Func<TInput, IReadOnlyCollection<TKey>> lambda for retrieving its dependencies. The keys are expected to be unique. In case a duplicate key is found, the block will complete with failure.

如果项目之间存在循环依赖关系,受影响的项目将保持未处理状态.TInput[] Unprocessed 属性允许在块完成后检索未处理的项目.如果未提供任何依赖项,项目也可以保持未处理状态.

In case of circular dependencies between items, the affected items will remain unprocessed. The property TInput[] Unprocessed allows to retrieve the unprocessed items after the completion of the block. An item can also remain unprocessed in case any of its dependencies is not supplied.

public class DependencyTransformBlock<TInput, TKey, TOutput> :
    ITargetBlock<TInput>, ISourceBlock<TOutput>
{
    private readonly ITargetBlock<TInput> _inputBlock;
    private readonly IPropagatorBlock<Item, TOutput> _transformBlock;

    private readonly object _locker = new object();
    private readonly Dictionary<TKey, Item> _items;

    private int _pendingCount = 1;
    // The initial 1 represents the completion of the _inputBlock

    private class Item
    {
        public TKey Key;
        public TInput Input;
        public bool HasInput;
        public bool IsCompleted;
        public HashSet<Item> Dependencies;
        public HashSet<Item> Dependents;

        public Item(TKey key) => Key = key;
    }

    public DependencyTransformBlock(
        Func<TInput, Task<TOutput>> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null)
    {
        if (transform == null)
            throw new ArgumentNullException(nameof(transform));
        if (keySelector == null)
            throw new ArgumentNullException(nameof(keySelector));
        if (dependenciesSelector == null)
            throw new ArgumentNullException(nameof(dependenciesSelector));

        dataflowBlockOptions =
            dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
        keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;

        _items = new Dictionary<TKey, Item>(keyComparer);

        _inputBlock = new ActionBlock<TInput>(async input =>
        {
            var key = keySelector(input);
            var dependencyKeys = dependenciesSelector(input);
            bool isReadyForProcessing = true;
            Item item;
            lock (_locker)
            {
                if (!_items.TryGetValue(key, out item))
                {
                    item = new Item(key);
                    _items.Add(key, item);
                }
                if (item.HasInput)
                    throw new InvalidOperationException($"Duplicate key ({key}).");
                item.Input = input;
                item.HasInput = true;

                if (dependencyKeys != null && dependencyKeys.Count > 0)
                {
                    item.Dependencies = new HashSet<Item>();
                    foreach (var dependencyKey in dependencyKeys)
                    {
                        if (!_items.TryGetValue(dependencyKey, out var dependency))
                        {
                            dependency = new Item(dependencyKey);
                            _items.Add(dependencyKey, dependency);
                        }
                        if (!dependency.IsCompleted)
                        {
                            item.Dependencies.Add(dependency);
                            if (dependency.Dependents == null)
                                dependency.Dependents = new HashSet<Item>();
                            dependency.Dependents.Add(item);
                        }
                    }
                    isReadyForProcessing = item.Dependencies.Count == 0;
                }
                if (isReadyForProcessing) _pendingCount++;
            }
            if (isReadyForProcessing)
            {
                await _transformBlock.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = 1
        });

        var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        });

        _transformBlock = new TransformBlock<Item, TOutput>(async item =>
        {
            try
            {
                TInput input;
                lock (_locker)
                {
                    Debug.Assert(item.HasInput && !item.IsCompleted);
                    input = item.Input;
                }
                var result = await transform(input).ConfigureAwait(false);
                lock (_locker)
                {
                    item.IsCompleted = true;
                    if (item.Dependents != null)
                    {
                        foreach (var dependent in item.Dependents)
                        {
                            Debug.Assert(dependent.Dependencies != null);
                            var removed = dependent.Dependencies.Remove(item);
                            Debug.Assert(removed);
                            if (dependent.HasInput
                                && dependent.Dependencies.Count == 0)
                            {
                                middleBuffer.Post(dependent);
                                _pendingCount++;
                            }
                        }
                    }
                    item.Input = default; // Cleanup
                    item.Dependencies = null;
                    item.Dependents = null;
                }
                return result;
            }
            finally
            {
                lock (_locker)
                {
                    _pendingCount--;
                    if (_pendingCount == 0) middleBuffer.Complete();
                }
            }
        }, dataflowBlockOptions);

        middleBuffer.LinkTo(_transformBlock);

        PropagateCompletion(_inputBlock, middleBuffer,
            condition: () => { lock (_locker) return --_pendingCount == 0; });
        PropagateCompletion(middleBuffer, _transformBlock);
        PropagateFailure(_transformBlock, middleBuffer);
        PropagateFailure(_transformBlock, _inputBlock);
    }

    // Constructor with synchronous lambda
    public DependencyTransformBlock(
        Func<TInput, TOutput> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null) : this(
            input => Task.FromResult(transform(input)),
            keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
    {
        if (transform == null) throw new ArgumentNullException(nameof(transform));
    }

    public TInput[] Unprocessed
    {
        get
        {
            lock (_locker) return _items.Values
                .Where(item => item.HasInput && !item.IsCompleted)
                .Select(item => item.Input)
                .ToArray();
        }
    }

    public Task Completion => _transformBlock.Completion;
    public void Complete() => _inputBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
    }

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target, out bool messageConsumed)
    {
        return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
    }

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        return _transformBlock.ReserveMessage(header, target);
    }

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        _transformBlock.ReleaseReservation(header, target);
    }

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
    {
        return _transformBlock.LinkTo(target, linkOptions);
    }

    private async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<bool> condition = null)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
        else
            if (condition == null || condition()) target.Complete();
    }

    private async void PropagateFailure(IDataflowBlock source,
        IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
    }
}

使用示例:

var block = new DependencyTransformBlock<Item, string, Item>(item =>
{
    DoWork(item);
    return item;
},
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
},
keyComparer: StringComparer.OrdinalIgnoreCase);

//...

block.LinkTo(DataflowBlock.NullTarget<Item>());

在此示例中,块链接到 NullTarget 以丢弃其输出,使其本质上成为 ActionBlock 等价物.

In this example the block is linked to a NullTarget in order to discard its output, so that it becomes essentially an ActionBlock equivalent.

这篇关于具有延迟的 TPL 数据流队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆