使用Batchblock.Triggerbatch()在TPL数据流管道中进行数据传播 [英] Data Propagation in TPL Dataflow Pipeline with Batchblock.Triggerbatch()

查看:90
本文介绍了使用Batchblock.Triggerbatch()在TPL数据流管道中进行数据传播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在生产者-消费者方案中,我有多个使用者,并且每个使用者都将操作发送到外部硬件,这可能需要一些时间.我的管道看起来像这样:

In my Producer-Consumer scenario, I have multiple consumers, and each of the consumers send an action to external hardware, which may take some time. My Pipeline looks somewhat like this:

BatchBlock-> TransformBlock-> BufferBlock->(几个)ActionBlocks

BatchBlock --> TransformBlock --> BufferBlock --> (Several) ActionBlocks

我已将我的ActionBlocks的BoundedCapacity分配为1. 从理论上讲,我希望仅当我的一个动作块可用于操作时,才触发Batchblock将一组项目发送到Transformblock.到那时为止,Batchblock应该只保留缓冲元素,而不要将其传递给Transformblock.我的批量大小是可变的.由于Batchsize是强制性的,我确实对BatchBlock批次大小有很高的上限,但是我真的不希望达到该上限,我想根据执行上述任务的Actionblock的可用性来触发我的批次

I have assigned BoundedCapacity of my ActionBlocks to 1. What I want in theory is, I want to trigger the Batchblock to send a group of items to the Transformblock only when one of my Actionblocks are available for operation. Till then the Batchblock should just keep buffering elements and not pass them on to the Transformblock. My batch-sizes are variable. As Batchsize is mandatory, I do have a really high upper-limit for BatchBlock batch size, however I really don't wish to reach upto that limit, I would like to trigger my batches depending upon the availability of the Actionblocks permforming the said task.

我已经在Triggerbatch()方法的帮助下实现了这一目标.我将Batchblock.Triggerbatch()称为ActionBlock中的最后一个动作.但是有趣的是,经过几天的正常工作后,管道变得异常混乱.经过检查,我发现有时在ActionBlocks完成其工作之后才输入批处理块的输入.在这种情况下,ActionBlocks实际上在工作结束时确实调用了Triggerbatch,但是由于此时根本没有对Batchblock的输入,因此对TriggerBatch的调用是徒劳的.一段时间后,当输入确实流入Batchblock时,就没有人可以调用TriggerBatch并重新启动管道.我正在寻找可以仅检查Batchblock的输入缓冲区中是否确实存在某些东西的东西,但是没有此类功能可用,我也找不到方法来检查TriggerBatch是否卓有成效.

I have achieved this with the help of the Triggerbatch() method. I am calling the Batchblock.Triggerbatch() as the last action in my ActionBlock.However interestingly after several days of working properly the pipeline has come to a hault. Upon checking I found out that sometimes the inputs to the batchblock come in after the ActionBlocks are done with their work. In this case the ActionBlocks do actually call Triggerbatch at the end of their work, however since at this point there is no input to the Batchblock at all, the call to TriggerBatch is fruitless. And after a while when inputs do flow in to the Batchblock, there is no one left to call TriggerBatch and restart the Pipeline. I was looking for something where I could just check if something is infact present in the inputbuffer of the Batchblock, however there is no such feature available, I could also not find a way to check if the TriggerBatch was fruitful.

谁能为我的问题提出可能的解决方案.不幸的是,对我来说,使用计时器触发分批不是一个选择.除了管道的开始之外,节流应该仅由一个ActionBlock的可用性来控制.

Could anyone suggest a possible solution to my problem. Unfortunately using a Timer to triggerbatches is not an option for me. Except for the start of the Pipeline, the throttling should be governed only by the availability of one of the ActionBlocks.

示例代码在这里:

    static BatchBlock<int> _groupReadTags;

    static void Main(string[] args)
    {
        _groupReadTags = new BatchBlock<int>(1000);

        var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
        BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
        int batchNo = 1;


        TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
        {

            Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
            //_groupReadTags.TriggerBatch();
            int sum = 0;

            foreach (int item in list)
            {
                Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
                sum += item;

            }
            batchNo++;
            return sum;

        });

            ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
            {
                Console.WriteLine("Number from ONE :{0}",x);
                await Task.Delay(500);

                    Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);

                _groupReadTags.TriggerBatch();



        },consumerOptions);

        ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
        {
            Console.WriteLine("Number from TWO :{0}", x);
            await Task.Delay(2000);
            _groupReadTags.TriggerBatch();

        }, consumerOptions);

        _groupReadTags.LinkTo(_workingBlock);
        _workingBlock.LinkTo(_frameBuffer);
        _frameBuffer.LinkTo(_worker1);
        _frameBuffer.LinkTo(_worker2);

        _groupReadTags.Post(10);
        _groupReadTags.Post(20);
        _groupReadTags.TriggerBatch();

        Task postingTask = new Task(() => PostStuff());
        postingTask.Start();
        Console.ReadLine();

    }



    static void PostStuff()
    {


        for (int i = 0; i < 10; i++)
            {
                _groupReadTags.Post(i);
                Thread.Sleep(100);
            }

        Parallel.Invoke(
            () => _groupReadTags.Post(100),
            () => _groupReadTags.Post(200),
            () => _groupReadTags.Post(300),
            () => _groupReadTags.Post(400),
            () => _groupReadTags.Post(500),
            () => _groupReadTags.Post(600),
            () => _groupReadTags.Post(700),
            () => _groupReadTags.Post(800)
                       );
    }

推荐答案

我发现以这种方式使用TriggerBatch是不可靠的:

I have found that using TriggerBatch in this way is unreliable:

    _groupReadTags.Post(10);
    _groupReadTags.Post(20);
    _groupReadTags.TriggerBatch();

显然,TriggerBatch旨在在模块内部使用,而不是在模块外部使用.我已经看到这种结果会导致计时出现奇怪的问题,例如,即使首先调用TriggerBatch,也将下一个批次的物料包含在当前批次中.

Apparently TriggerBatch is intended to be used inside the block, not outside it like this. I have seen this result in odd timing issues, like items from next batch batch being included in the current batch, even though TriggerBatch was called first.

对于使用DataflowBlock.Encapsulate的替代方法,请参见我对这个问题的回答: BatchBlock生成具有在TriggerBatch()之后发送的元素的批处理

Please see my answer to this question for an alternative using DataflowBlock.Encapsulate: BatchBlock produces batch with elements sent after TriggerBatch()

这篇关于使用Batchblock.Triggerbatch()在TPL数据流管道中进行数据传播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆