如何使用 TPL 数据流库指定无序执行块? [英] How can I specify an unordered Execution Block using the TPL Dataflow Library?

查看:37
本文介绍了如何使用 TPL 数据流库指定无序执行块?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想设置一个 TransformBlock 来并行处理它的项目.因此,我将 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 设置为 > 1.我不关心消息的顺序,但是 文档说:

I want to set up a TransformBlock that processes its item in parallel. Thus, I'm setting ExecutionDataflowBlockOptions.MaxDegreeOfParallelism to > 1. I don't care about the order of the messages but the documentation says:

当您指定的最大并行度大于 1 时,会同时处理多条消息,因此,消息可能不会按照接收顺序进行处理.但是,从块中输出消息的顺序将是正确排序的.

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

正确排序"是否意味着如果队列中有一条消息需要较长的处理时间,则直到处理完这条消息后才会输出其他消息?

Does "correctly ordered" mean that if there is one message in the queue that needs long processing time, further messages are not output until this one message is processed?

如果是这样,我如何指定一个不关心排序的执行块(例如 TransformBlock)?还是必须在消费端注明不在乎点单?

And if so, how can I specify an Execution Block (for example a TransformBlock) that does not care about the ordering? Or do I have to specify at the consumption end that I don't care about ordering?

推荐答案

库中没有这样的块,但您可以通过组合一个 ActionBlock 和一个 BufferBlock 轻松地自己创建一个.类似的东西:

There is no such block in the library, but you can easily create one yourself by combining an ActionBlock and a BufferBlock. Something like:

public static IPropagatorBlock<TInput, TOutput>
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
    var buffer = new BufferBlock<TOutput>(options);
    var action = new ActionBlock<TInput>(
        async input =>
        {
            var output = func(input);
            await buffer.SendAsync(output);
        }, options);

    action.Completion.ContinueWith(
        t =>
        {
            IDataflowBlock castedBuffer = buffer;

            if (t.IsFaulted)
            {
                castedBuffer.Fault(t.Exception);
            }
            else if (t.IsCanceled)
            {
                // do nothing: both blocks share options,
                // which means they also share CancellationToken
            }
            else
            {
                castedBuffer.Complete();
            }
        });

    return DataflowBlock.Encapsulate(action, buffer);
}

这样,一旦一个项目被 ActionBlock 处理,它就会立即被移动到 BufferBlock,这意味着不保持排序.

This way, once an item is processed by the ActionBlock, it's immediately moved to the BufferBlock, which means ordering is not maintained.

这段代码的一个问题是它没有很好地观察 BoundedCapacity 集合:实际上,这个块的容量是选项中设置的容量的两倍(因为两个块中的每一个都有单独的容量).

One issue with this code is that it doesn't observe the set BoundedCapacity well: in effect, the capacity of this block is twice the capacity set in options (because each of the two blocks has a separate capacity).

这篇关于如何使用 TPL 数据流库指定无序执行块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆