TPL数据流,可以替代JoinBlock限制吗? [英] TPL Dataflow, alternative to JoinBlock limitations?

查看:152
本文介绍了TPL数据流,可以替代JoinBlock限制吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找JoinBlock的替代方法,该方法可以通过n-TransformBlocks链接,并将所有TransformBlock源块的消息合并/合并在一起,以便将这样的集合传递给另一个数据流块.

I look for an alternative to JoinBlock which can be linked to by n-TransformBlocks and join/merge messages of all TransformBlock source blocks together in order to pass a collection of such on to another data flow block.

JoinBlock可以很好地完成工作,但仅限于最多连接3个源代码块.它也遭受许多低效率的困扰(加入2个源块的偶数类型(ints)非常慢).有没有办法让Task从TransformBlocks返回并等待所有TransformBlocks都完成一个要传递的任务,然后再接受Task<item>?

JoinBlock does the job fine but it is limited to hooking up to 3 source blocks. It also suffers from quite a number inefficiencies (very slow to join even value types (ints) of 2 source blocks). Is there a way to have Tasks returned from the TransformBlocks and wait until all TransformBlocks have a completed task to pass on before accepting the Task<item>?

还有其他选择吗?我可能有1-20个这样的转换块,在传递联接的项目集合之前,需要将哪些项目联接在一起.保证每个转换块为每个已转换"输入项恰好返回一个输出项.

Any alternative ideas? I potentially have 1-20 such transform blocks which items I need to join together before passing on the joined item collection. Each transform block is guaranteed to return exactly one output item for each input item "transformed".

要求的说明:

根据我以前的问题之一,我按如下所示设置了JoinBlocks:

Per one of my previous questions, I set up my JoinBlocks as follows:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}

推荐答案

一种方法是使用BatchBlock并将Greedy设置为false.在此配置中,该块将不执行任何操作,直到来自n个不同块的n个项目等待使用(其中n是您在创建BatchBlock时设置的编号).发生这种情况时,它将立即消耗所有n个项目,并生成一个包含所有项目的数组.

One way to do this is to use BatchBlock with Greedy set to false. In this configuration, the block doesn't do anything until there are n items from n different blocks waiting for it to be consumed (where n is the number you set when creating the BatchBlock). When that happens, it consumes all n items at once and produces an array containing all of the items.

此解决方案的一个警告是未对结果数组进行排序:您不会知道哪个项目来自哪个来源.而且我不知道它的性能与JoinBlock相比如何,您必须自己进行测试. (尽管我会理解,由于非贪婪消费所必需的开销,因此以这种方式使用BatchBlock是否会更慢.)

One caveat with this solution is that the resulting array is not sorted: you're not going to know which item came from which source. And I have no idea how does its performance compare with JoinBlock, you'll have to test that by yourself. (Though I would understand if using BatchBlock this way was slower, because of the overhead necessary for non-greedy consumption.)

这篇关于TPL数据流,可以替代JoinBlock限制吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆