BatchBlock生成带有在TriggerBatch()之后发送的元素的批处理 [英] BatchBlock produces batch with elements sent after TriggerBatch()

查看:52
本文介绍了BatchBlock生成带有在TriggerBatch()之后发送的元素的批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由几个块组成的数据流管道.当元素流过我的处理管道时,我想按字段 A 将它们分组.为此,我有一个 BatchBlock ,它的 BoundedCapacity 很高.在其中存储我的元素,直到我决定应该发布它们为止.因此,我调用 TriggerBatch()方法.

I have a Dataflow pipeline consisting of several blocks. When elements are flowing through my processing pipeline, I want to group them by field A. To do this I have a BatchBlock with high BoundedCapacity. In it I store my elements until I decide that they should be released. So I invoke TriggerBatch() method.

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}

这是它的外观.问题在于,生成的批次有时包含下一个发布的元素,该元素不应存在.

This is how it looks. The problem is, that the batch produced, sometimes contains the next posted element, which shouldn't be there.

说明:

BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);

在这一点上,我希望我的批次是 {A,A,A} ,但它是 {A,A,A,B}

In this point I expect my batch to be {A,A,A}, but it is {A,A,A,B}

类似于 TriggerBatch()是异步的,而 SendAsync 实际上是在批量创建之前执行的.

Like TriggerBatch() was asynchronous, and SendAsync was in fact executed before the batch was actually made.

我该如何解决?我显然不想在其中放入 Task.Wait(x)(我尝试过,并且可以工作,但是性能当然很差).

How can I solve this? I obviously don't want to put Task.Wait(x) in there (I tried, and it works, but then performance is poor, of course).

推荐答案

我也试图通过在错误的地方调用 TriggerBatch 来遇到此问题.如前所述,使用 DataflowBlock.Encapsulate 的SlidingWindow示例是这里的答案,但是花了一些时间进行调整,所以我认为我应该分享自己完整的代码块.

I also encountered this issue by trying to call TriggerBatch in the wrong place. As mentioned, the SlidingWindow example using DataflowBlock.Encapsulate is the answer here, but it took some time to adapt so I thought I'd share my completed block.

我的 ConditionalBatchBlock 创建的批次最大为最大大小,如果满足特定条件,则可能更快.在我的特定情况下,我需要创建100个批次,但是总是在检测到数据中的某些更改时创建新批次.

My ConditionalBatchBlock creates batches up to a maximum size, possibly sooner if a certain condition is met. In my specific scenario I needed to create batches of 100, but always create new batches when certain changes in the data were detected.

public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
    var queue = new Queue<T>();

    var source = new BufferBlock<T[]>();

    var target = new ActionBlock<T>(async item =>
    {
        // start a new batch if required by the condition
        if (condition(queue, item))
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }

        queue.Enqueue(item);

        // always send a batch when the max size has been reached
        if (queue.Count == batchSize)
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }
    });

    // send any remaining items
    target.Completion.ContinueWith(async t =>
    {
        if (queue.Any())
            await source.SendAsync(queue.ToArray());

        source.Complete();
    });

    return DataflowBlock.Encapsulate(target, source);
}

在您的情况下, condition 参数可能更简单.我需要查看队列以及当前项目,以确定是否要创建新批次.

The condition parameter may be simpler in your case. I needed to look at the queue as well as the current item to make the determination whether to create a new batch.

我这样使用它:

public async Task RunExampleAsync<T>()
{
    var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));

    var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));

    conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await ReadDataAsync<T>(conditionalBatchBlock);

    await actionBlock.Completion;
}

这篇关于BatchBlock生成带有在TriggerBatch()之后发送的元素的批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆