如何才能TPL数据流块下游得到源所产生的数据? [英] How can a TPL Dataflow block downstream get data produced by a source?

查看:158
本文介绍了如何才能TPL数据流块下游得到源所产生的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我处理使用TPL数据流的图像。我收到的处理请求,从流中读取的图像,适用于多种变换,然后写入生成的图像到另一个流:

I'm processing images using TPL Dataflow. I receive a processing request, read an image from a stream, apply several transformations, then write the resulting image to another stream:

Request -> Stream -> Image -> Image ... -> Stream

有关,我用了块:

BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>

现在的问题是初始要求是包含必要建立一些数据所产生的再加上一些额外的信息,我需要在这一点上。我一定要通过原始的要求(或其他一些上下文对象)跌在所有行到 writerBlock 其他块是这样的:

The problem is the initial Request is what contains some data necessary to create the resulting Stream along with some additional info I need at that point. Do I have to pass the original Request (or some other context object) down the line to the writerBlock across all the other blocks like this:

TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...

(这是丑陋的),或者是有办法的第一个块链接到最后一个(或者,概括,对需要附加数据的那些)?

(which is ugly), or is there a way to link the first block to the last one (or, generalizing, to the ones that need the additional data)?

推荐答案

是的,你pretty的很多需要做你所描述的,从每块传递更多的数据到下一个。

Yes, you pretty much need to do what you described, passing the additional data from every block to the next one.

但使用了几个辅助方法,可以让这个更简单:

But using a couple of helper methods, you can make this much simpler:

public static IPropagatorBlock<TInput, Tuple<TOutput, TInput>>
    CreateExtendedSource<TInput, TOutput>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<TInput, Tuple<TOutput, TInput>>(
        input => Tuple.Create(transform(input), input));
}

public static IPropagatorBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>
    CreateExtendedTransform<TInput, TOutput, TExtension>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>(
        tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}

的签名看起来令人生畏,但他们实际上没有那么糟糕。

The signatures look daunting, but they are actually not that bad.

此外,您可能要添加选项传递给创建块采取异步委托过载或重载。

Also, you might want to add overloads that pass options to the created block, or overloads that take async delegates.

例如,如果您想进行多项使用单独的块某些操作,同时通过原来的号码,一路上,你可以这样做:

For example, if you wanted to perform some operations on a number using separate blocks, while passing the original number along the way, you could do something like:

var source = new BufferBlock<int>();
var divided = CreateExtendedSource<int, double>(i => i / 2.0);
var formatted = CreateExtendedTransform<double, string, int>(d => d.ToString("0.0"));
var writer = new ActionBlock<Tuple<string, int>>(tuple => Console.WriteLine(tuple));

source.LinkTo(divided);
divided.LinkTo(formatted);
formatted.LinkTo(writer);

for (int i = 0; i < 10; i++)
    source.Post(i);

正如你所看到的,你的lambda表达式(除了最后一个)只涉及当前值(INT , <$ C C $>双字符串,根据流水线的阶段),原始的值(总 INT )自动传递。在任何时候,你可以使用创建的块使用正常的构造函数访问这两个值(如最后 ActionBlock 中的例子)。

As you can see, your lambdas (except for the last one) deal only with the "current" value (int, double or string, depending on the stage of the pipeline), the "original" value (always int) is passed automatically. At any moment, you can use block created using the normal constructor to access both values (like the final ActionBlock in the example).

(即 BufferBlock 实际上不是必要的,但我把它添加到您的设计更紧密地匹配。)

(That BufferBlock isn't actually necessary, but I added it to more closely match your design.)

这篇关于如何才能TPL数据流块下游得到源所产生的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆