如何标记TPL数据流周期才能完成? [英] How to mark a TPL dataflow cycle to complete?

查看:182
本文介绍了如何标记TPL数据流周期才能完成?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

鉴于TPL数据流以下安装

Given the following setup in TPL dataflow.

var directory = new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles");

var dirBroadcast=new BroadcastBlock<DirectoryInfo>(dir=>dir);

var dirfinder = new TransformManyBlock<DirectoryInfo, DirectoryInfo>((dir) =>
{
    return directory.GetDirectories();

});
var tileFilder = new TransformManyBlock<DirectoryInfo, FileInfo>((dir) =>
{
    return directory.GetFiles();
});
dirBroadcast.LinkTo(dirfinder);
dirBroadcast.LinkTo(tileFilder);
dirfinder.LinkTo(dirBroadcast);

var block = new XYZTileCombinerBlock<FileInfo>(3, (file) =>
{
    var coordinate = file.FullName.Split('\\').Reverse().Take(3).Reverse().Select(s => int.Parse(Path.GetFileNameWithoutExtension(s))).ToArray();
    return XYZTileCombinerBlock<CloudBlockBlob>.TileXYToQuadKey(coordinate[0], coordinate[1], coordinate[2]);
},
(quad) =>
    XYZTileCombinerBlock<FileInfo>.QuadKeyToTileXY(quad,
        (z, x, y) => new FileInfo(Path.Combine(directory.FullName,string.Format("{0}/{1}/{2}.png", z, x, y)))),
    () => new TransformBlock<string, string>((s) =>
    {
        Trace.TraceInformation("Combining {0}", s);
        return s;
    }));

tileFilder.LinkTo(block);


using (new TraceTimer("Time"))
{
    dirBroadcast.Post(directory);

    block.LinkTo(new ActionBlock<FileInfo>((s) =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);

    }));
    block.Complete();
    block.Completion.Wait();

}



我想知道我怎么可以标记这是因为完成的周期。目录发布到dirBroadcast广播其职位,可能会回发新迪尔斯的广播时,dirfinder,所以我不能简单地将其标记为,因为它完全会阻止从dirfinder添加任何目录。我应该重新设计,以保持迪尔斯数量的轨迹或有在TPL这个事情。

i am wondering how I can mark this to complete because of the cycle. A directory is posted to the dirBroadcast broadcaster which posts to the dirfinder that might post back new dirs to the broadcaster, so i cant simply mark it as complete because it would block any directories being added from the dirfinder. Should i redesign it to keep track of the number of dirs or is there anything for this in TPL.

推荐答案

如果目的的你的代码是使用某种并行的话,我就不会使用TPL数据流,并建议使用微软的反应,而不是框架遍历目录结构。我认为它变得简单多了。

If the purpose of your code is to traverse the directory structure using some sort of parallelism then I would suggest not using TPL Dataflow and use Microsoft's Reactive Framework instead. I think it becomes much simpler.

下面是我会怎么做。

首先定义一个递归函数建立目录列表:

First define a recursive function to build the list of directories:

Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = di =>
    Observable
        .Return(di)
        .Concat(di.GetDirectories()
            .ToObservable()
            .SelectMany(di2 => recurse(di2)))
        .ObserveOn(Scheduler.Default);

这执行目录的递归并使用默认接收调度,导致可观察到的并行运行

This performs the recurse of the directories and uses the default Rx scheduler which causes the observable to run in parallel.

所以,通过调用递归与输入的DirectoryInfo 我得到输入目录及其所有后代的一个观察的名单

So by calling recurse with an input DirectoryInfo I get an observable list of the input directory and all of its descendants.

现在我可以建立一个非常方便的查询来获得我想要的结果:

Now I can build a fairly straight-forward query to get the results I want:

var query =
    from di in recurse(new DirectoryInfo(@"C:\dev\kortforsyningen_dsm\tiles"))
    from fi in di.GetFiles().ToObservable()
    let zxy =
        fi
            .FullName
            .Split('\\')
            .Reverse()
            .Take(3)
            .Reverse()
            .Select(s => int.Parse(Path.GetFileNameWithoutExtension(s)))
            .ToArray()
    let suffix = String.Format("{0}/{1}/{2}.png", zxy[0], zxy[1], zxy[2])
    select new FileInfo(Path.Combine(di.FullName, suffix));

现在我可以采取行动的查询是这样的:

Now I can action the query like this:

query
    .Subscribe(s =>
    {
        Trace.TraceInformation("Done combining : {0}", s.Name);
    });

现在我可能已经错过了一点点在你的自定义代码,但如果这是你想要的方法拿我敢肯定,你可以很容易地修复任何逻辑的问题。

Now I may have missed a little bit in your custom code but if this is an approach you want to take I'm sure you can fix any logical issues quite easily.

此代码会自动当它运行了孩子的目录和文件的处理完成。

This code automatically handles completion when it runs out of child directories and files.

要接收添加到您的NuGet项目中寻找接收 - 美。

To add Rx to your project look for "Rx-Main" in NuGet.

这篇关于如何标记TPL数据流周期才能完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆