TPL数据流TransformBlock输入顺序和输出顺序项不匹配 [英] TPL DataFlow TransformBlock input order and output order item mismatch

查看:503
本文介绍了TPL数据流TransformBlock输入顺序和输出顺序项不匹配的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

编辑:建议svick我取代了自定义IPropagatorBlock用一个简单的TransformBlock,但是,我仍然看到输入项目和订单的输出项目的顺序之间的不匹配。下面我TransformBlock实例化和函数功能,我传递:

  quoteBuffer =新TransformBlock<元组LT;符号,诠释>中名单,其中,报价>>(syncExecution,新ExecutionDataflowBlockOptions {SingleProducerConstrained = TRUE,MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});

//函数来执行同步处理
FUNC<元组LT;符号,诠释>中名单,其中,报价>> syncExecution =新的函数功能:其中,元组LT;符号,诠释>中名单,其中,报价>>(partitionTuple =>
{
    符号符号= partitionTuple.Item1;
    INT partitionIndex = partitionTuple.Item2;

    //读取二进制数据
    字节[]的字节数组= binaryDataReaders [符号] .ReadBytes(partitionIndex);

    //反序列化和返回报价列表
    名单<报价> quoteList = dataInterfaces [符号] .Deserialize(符号,字节);

    返回quoteList;
});
 

这是我张贴的变换块:

  quoteBuffer.SendAsync(新的记录<符号,诠释>(符号,计数器));
 

ORIGINIAL问题:

有人帮我用下面的自定义转换块。我们的想法是发布/ sendasync TInput并TInput正在采取行动以异步的方式,而自定义转换块preserves张贴的项目返回改造项目时的顺序。

例如,如果在各自的顺序张贴1,2,3和转换功能方块每个输入和返回的项目,正确的输出值和顺序应该是1,4,9,irregardless其中3变换操作完成时。

不过,我怀疑有与code错误,因为输出顺序是不正确的。更糟糕的是,在搞砸了订单的位置是随机的,这使得它难以调试,但是,显然正在拉开序幕的任务转换输入元素到输出元素的完整总是不同的事实的反映。

是否有人可以看看,并可能给一些提示,我在这里缺少什么?谢谢

 公共静态IPropagatorBlock< TInput,TOutput> CreateConcurrentOrderedTransformBlock< TInput,TOutput>(Func键< TInput,TOutput>变换)
    {
        VAR队列=新TransformBlock<任务< TOutput>中TOutput>(T =>吨);

        VAR处理器=新ActionBlock<元组LT; TInput,动作< TOutput>>>(
            元组=> tuple.Item2(变换(tuple.Item1)),
            新ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        VAR enqueuer =新ActionBlock< TInput>(
            异步项=>
            {
                VAR TCS =新TaskCompletionSource< TOutput>();
                等待processor.SendAsync(
                    新行< TInput,动作< TOutput>>(项目,tcs.SetResult));
                等待queue.SendAsync(tcs.Task);
            });

        enqueuer.Completion.ContinueWith(
            _ =>
            {
                queue.Complete();
                processor.Complete();
            });

        返回DataflowBlock.Encapsulate(enqueuer,队列);
    }
 

解决方案

我回答我的问题,因为我发现,造成这一切麻烦的问题。如可从我的拉姆达前pression看出我读字节数组数据块内,这意味着,只要并行度设置为> 1由相同的文件中读取字节数组从物理磁盘兼任。这显然​​是真的食堂与在该字节读的位置。我设置了读操作的起点与br.basestream.seek(...),并通过br.readbytes(numberBytes)读取的字节。正如一些运作的同时影响到文件中的位置的二进制读者最有可能读取一个无序的方式,导致混乱UPS字节。

我通过将二进制阅读器出拉姆达EX pression解决了这个问题,而是通过读取字节数组到EX pression并使用并发仅用于反序列化和合并/排序目的而解决了这个问题。是的,变换块preserves的顺序。感谢svick有关TPL数据流方分享你的丰富的专业知识。

EDIT: As recommended by svick I replaced the custom IPropagatorBlock with a simple TransformBlock, however, I still see a mismatch between the order of input items and order of output items. Below my TransformBlock instantiation and Func that I pass in:

quoteBuffer = new TransformBlock<Tuple<Symbol, int>, List<Quote>>(syncExecution, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true,  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

//Function that performs Sync Processing
Func<Tuple<Symbol, int>, List<Quote>> syncExecution = new Func<Tuple<Symbol, int>, List<Quote>>(partitionTuple =>
{
    Symbol symbol = partitionTuple.Item1;
    int partitionIndex = partitionTuple.Item2;

    //Read Binary Data
    byte[] byteArray = binaryDataReaders[symbol].ReadBytes(partitionIndex);

    //Deserialize and return quote list
    List<Quote> quoteList = dataInterfaces[symbol].Deserialize(symbol, byteArray);

    return quoteList;
});

And this is how I post to the transform block:

quoteBuffer.SendAsync(new Tuple<Symbol, int>(symbol, counter));

ORIGINIAL QUESTION:

someone helped me with the following custom transform block. The idea is to post/sendasync TInput and have TInput being acted upon in an async manner while the custom transform block preserves the order of posted items when returning transformed items.

For example, if posting 1,2,3 in the respective order and the transform function squares each input and returns the item, the correct output values and order should be 1, 4, 9, irregardless of which of the 3 transform operations completes when.

However, I suspect there is an error with the code because the output order is not correct. Worse yet, the messed up order location is random which makes it harder to debug but is a reflection of the fact that apparently the tasks that are kicked off to transform input elements into output elements complete always differently.

Can someone please take a look and possible give some hints what I am missing here? Thanks

public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
    {
        var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

        var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
            tuple => tuple.Item2(transform(tuple.Item1)),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        var enqueuer = new ActionBlock<TInput>(
            async item =>
            {
                var tcs = new TaskCompletionSource<TOutput>();
                await processor.SendAsync(
                    new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
                await queue.SendAsync(tcs.Task);
            });

        enqueuer.Completion.ContinueWith(
            _ =>
            {
                queue.Complete();
                processor.Complete();
            });

        return DataflowBlock.Encapsulate(enqueuer, queue);
    }

解决方案

I answer my own question because I found the bug which caused all this trouble. As can be seen from my lambda expression I read byte arrays within the data block, meaning that as soon as degree of parallelism is set to >1 byte arrays are read from the same file from a physical disk concurrently. This apparently really messes with the locations at which the bytes are read. I set the starting point of the read operation with br.basestream.seek(...) and read bytes through br.readbytes(numberBytes). As several operations concurrently affect the locations within the file the binary reader most likely reads bytes in an unordered fashion which causes the mess ups.

I solved the problem by pulling the binary reader out of the lambda expression and instead pass read byte arrays into the expression and use concurrency only for deserialization and merge/sorting purposes which solved the problem. And yes, transform block preserves the order. Thanks svick for sharing your vast expertise on the tpl dataflow side.

这篇关于TPL数据流TransformBlock输入顺序和输出顺序项不匹配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆