TPL:如何拆分和合并数据流? [英] TPL: How do I split and merge the dataflow?

查看:89
本文介绍了TPL:如何拆分和合并数据流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用tpl创建具有以下形式的数据流:

I am trying to create a dataflow using tpl with the following form:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

这个想法是,GetInputPathsBlock是一个块,它找到要加载的输入数据的路径,然后将该路径发送到每个LoadDataBlock. LoadDataBlocks都是相同的(除了它们各自从GetInputPaths中接收到唯一的inputPath字符串).然后将加载的数据发送到ProcessDataBlock,它进行一些简单的处理.然后将每个ProcessDataBlock中的数据发送到MergeDataBlock,然后将其合并并发送到SaveDataBlock,然后将其保存到文件中.

The idea is, that GetInputPathsBlock is a block, which finds the paths to the input data that is to be loaded, and then sends the path to each LoadDataBlock. The LoadDataBlocks are all identical (except that they have each recieved a unique inputPath string from GetInputPaths). The loaded data is then sent to the ProcessDataBlock, which does some simple processing. Then the data from each ProcessDataBlockis sent to MergeDataBlock, which merges it and sends it to SaveDataBlock, which then saves it to a file.

将其视为需要每月运行的数据流.首先,找到每天数据的路径.每天的数据都被加载和处理,然后在整个月中合并在一起并保存.每个月可以并行运行,一个月中每一天的数据可以并行加载和并行处理(在加载单日数据之后),并且当该月的所有内容加载并处理后,就可以合并并保存

Think of it as a dataflow that needs to run for each month. First the path is found for the data for each day. Each day's data is loaded and processed, and then merged together for the entire month and saved. Each month can be run parallelly, data for each day in a month can be loaded parallelly and processed parallelly (after the individual day data has been loaded), and once everything for the month has been loaded and processed, it can be merged and saved.

我尝试过的事情

据我所知TransformManyBlock<TInput,string>可用于进行拆分(GetInputPathsBlock),并可链接到普通TransformBlock<string,InputData>(LoadDataBlock),并从那里链接到另一个TransformBlock<InputData,ProcessedData>( ProcessDataBlock),但我不知道如何将其合并回单个块.

As far as I can tell TransformManyBlock<TInput,string> can be used to do the splitting (GetInputPathsBlock), and can be linked to a normal TransformBlock<string,InputData> (LoadDataBlock), and from there to another TransformBlock<InputData,ProcessedData> (ProcessDataBlock), but I don't know how to then merge it back to a single block.

我的目光

我找到了此答案,该答案使用TransformManyBlockIEnumerable<item>变为item,但是我不太了解,也无法将TransformBlock<InputData,ProcessedData>(ProcessDataBlock)链接到 TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>,所以我不知道如何使用它.

I found this answer, which uses TransformManyBlock to go from an IEnumerable<item> to item, but I don't fully understand it, and I can't link a TransformBlock<InputData,ProcessedData> (ProcessDataBlock) to a TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>, so I don't know how to use it.

我也看到了类似的答案,这建议使用JoinBlock,但是输入文件的数量N有所不同,并且文件都以相同的方式加载.

I have also seen answers like this, which suggests using JoinBlock, but the number of input files N varies, and the files are all loaded in the same way anyway.

还有此答案,它似乎可以满足我的要求,但我并不完全理解,而且我不知道如何将字典中的设置转移到我的案子中.

There is also this answer, which seems to do what I want, but I don't fully understand it, and I don't know how the setup with the dictionary would be transferred to my case.

如何拆分和合并数据流?

  • 我是否缺少块类型
  • 我可以以某种方式两次使用TransformManyBlock吗?
  • tpl对拆分/合并有意义吗,还是有一种更简单的异步/等待方式?
  • Is there a block type I am missing
  • Can I somehow use TransformManyBlock twice?
  • Does tpl make sense for the split/merge or is there a simpler async/await way?

推荐答案

我将使用嵌套块来避免拆分每月数据,然后不得不再次合并它们.这是两个嵌套的TransformBlock的示例,它们处理2020年的所有天:

I would use a nested block to avoid splitting my monthly data and then having to merge them again. Here is an example of two nested TransformBlocks that process all days of the year 2020:

var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
    {
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
        await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();

为了收集内部区块的日常结果,我使用了扩展方法ToListAsync,如下所示:

For collecting the daily results of the inner block I used the extension method ToListAsync that is shown below:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

这篇关于TPL:如何拆分和合并数据流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆