TPL数据流:有限的容量并等待完成 [英] TPL Dataflow: Bounded capacity and waiting for completion

查看:59
本文介绍了TPL数据流:有限的容量并等待完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为简单起见,下面我将一个现实场景复制为LINQPad脚本:

Below I have replicated a real life scenario as a LINQPad script for the sake of simplicity:

var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;

var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};

var counter1 = 0;
var counter2 = 0;

var delay1 = 10;
var delay2 = 25;

var action1 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay1); Interlocked.Increment(ref counter1);});
var action2 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay2); Interlocked.Increment(ref counter2);});

var actionBlock1 = new ActionBlock<IEnumerable<string>>(action1, edbOptions);
var actionBlock2 = new ActionBlock<IEnumerable<string>>(action2, edbOptions);

var batchBlock1 = new BatchBlock<string>(5, gdbOptions);
var batchBlock2 = new BatchBlock<string>(5, gdbOptions);

batchBlock1.LinkTo(actionBlock1, dlOptions);
batchBlock2.LinkTo(actionBlock2, dlOptions);

var bufferBlock1 = new BufferBlock<string>(dbOptions); 
var bufferBlock2 = new BufferBlock<string>(dbOptions); 

bufferBlock1.LinkTo(batchBlock1, dlOptions);
bufferBlock2.LinkTo(batchBlock2, dlOptions);

var bcBlock = new BroadcastBlock<string>(x => x, dbOptions);

bcBlock.LinkTo(bufferBlock1, dlOptions);
bcBlock.LinkTo(bufferBlock2, dlOptions);

var mainBlock = new TransformBlock<int, string>(x => x.ToString(), edbOptions);
mainBlock.LinkTo(bcBlock, dlOptions);

mainBlock.Dump("Main Block");
bcBlock.Dump("Broadcast Block");
bufferBlock1.Dump("Buffer Block 1");
bufferBlock2.Dump("Buffer Block 2");
actionBlock1.Dump("Action Block 1");
actionBlock2.Dump("Action Block 2");

foreach(var i in Enumerable.Range(1, total))
  await mainBlock.SendAsync(i, cts.Token);

mainBlock.Complete();

await Task.WhenAll(actionBlock1.Completion, actionBlock2.Completion);

counter1.Dump("Counter 1");
counter2.Dump("Counter 2");

此代码有两个问题:

  1. 尽管我将所有适当块的 BoundedCapacity 限制为10个元素,但似乎我几乎可以一次推送所有1,000,000条消息.这是预期的行为吗?
  2. 尽管整个网络都配置为传播完成,但似乎所有块在调用 mainBlock.Complete()之后几乎立即就完成了.我希望 counter1 counter2 变量都等于 total .有没有办法实现这种行为?
  1. Although I limited BoundedCapacity of all appropriate blocks to 10 elements, it seems like I can push all 1,000,000 messages almost at once. Is this expected behavior?
  2. Although the entire network is configured to propagate completion, it seems like all blocks get completed almost immediately after calling mainBlock.Complete(). I expect that both counter1 and counter2 variables to be equal to total. Is there a way to achieve such behavior?

推荐答案

是的,这是预期的行为,由于 BroadcastBlock :

Yes, this is the expected behavior, because of the BroadcastBlock:

提供了一个缓冲区,用于一次存储最多一个元素,并在到达每个消息时将其覆盖.

Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.

这意味着,如果将 BroadcastBlock 链接到具有 BoundedCapacity 的块,则会丢失消息.

This means that if you link BroadcastBlock to blocks with BoundedCapacity, you will lose messages.

要解决此问题,您可以创建一个行为类似于 BroadcastBlock 的自定义块,但可以保证将其交付给所有目标.但这并不是一件容易的事,因此您可能会对一个更简单的变体感到满意(最初来自我的旧答案):

To fix that, you could create a custom block that behaves like BroadcastBlock, but guarantees delivery to all targets. But doing that is not trivial, so you might be satisified with a simpler variant (originally from my old answer):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targetsList)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

您的情况是:

var bcBlock = CreateGuaranteedBroadcastBlock(
    new[] { bufferBlock1, bufferBlock2 }, dbOptions);

这篇关于TPL数据流:有限的容量并等待完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆