TPL数据流,只有当所有源数据块完成保证完成 [英] TPL Dataflow, guarantee completion only when ALL source data blocks completed

查看:490
本文介绍了TPL数据流,只有当所有源数据块完成保证完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我怎样才能重新写代码,当两个transformblocks完成了代码完成?我认为完成意味着它标记为完成和走出队列是空的?

How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}



我编辑的代码,添加一个输入缓冲区计数每个变换块。很明显,所有100项传输到每个变换块的。但只要transformblocks之一完成的processorblock不接受任何更多的项目,而是在不完全transformblock的输入缓冲器只是刷新输入缓冲器。

I edited the code, adding an input buffer count for each transform block. Clearly all 100 items are streamed to each of the transform blocks. But as soon as one of the transformblocks finishes the processorblock does not accept any more items and instead the input buffer of the incomplete transformblock just flushes the input buffer.

推荐答案

这个问题正是casperOne在他的回答说。一旦第一个变换块完成后,处理器模块进入整理模式:它会处理在其输入队列中剩余的项目,但不会接受任何新项目

The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into "finishing mode": it will process remaining items in its input queue, but it won't accept any new items.

有一个比分裂,虽然在你的两个处理器模块解决方法更为简单:不要设置 PropagateCompletion ,而是手动设置处理器模块完成当两个变换块完成:

There is a simpler fix than splitting your processor block in two though: don't set PropagateCompletion, but instead set completion of the processor block manually when both transform blocks complete:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());

这篇关于TPL数据流,只有当所有源数据块完成保证完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆