TPL Dataflow,保证所有源数据块完成后才完成 [英] TPL Dataflow, guarantee completion only when ALL source data blocks completed

查看:42
本文介绍了TPL Dataflow,保证所有源数据块完成后才完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当两个转换块完成时,我如何重写代码完成的代码?我认为完成意味着它被标记为完成并且输出队列"为空?

How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

我编辑了代码,为每个转换块添加了一个输入缓冲区计数.很明显,所有 100 个项目都被流式传输到每个转换块.但是一旦转换块之一完成,处理器块就不再接受任何项目,而是不完整的转换块的输入缓冲区只是刷新输入缓冲区.

I edited the code, adding an input buffer count for each transform block. Clearly all 100 items are streamed to each of the transform blocks. But as soon as one of the transformblocks finishes the processorblock does not accept any more items and instead the input buffer of the incomplete transformblock just flushes the input buffer.

推荐答案

这个问题正是 casperOne 在他的回答中所说的.一旦第一个转换块完成,处理器块就会进入完成模式":它将处理其输入队列中的剩余项目,但不会接受任何新项目.

The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into "finishing mode": it will process remaining items in its input queue, but it won't accept any new items.

有一个比将处理器块一分为二更简单的解决方法:不要设置 PropagateCompletion,而是在两个转换块完成时手动设置处理器块的完成:

There is a simpler fix than splitting your processor block in two though: don't set PropagateCompletion, but instead set completion of the processor block manually when both transform blocks complete:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());

这篇关于TPL Dataflow,保证所有源数据块完成后才完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆