合并数据流结果 [英] Combining dataflow results

查看:93
本文介绍了合并数据流结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个Dataflow管道,该管道读取文件的集合,并针对每个文件中的每一行执行一系列Dataflow块.

I am develop a Dataflow pipeline which reads a collection of files and, for each line in each file, performs a series of Dataflow blocks.

对于文件中的每一行完成所有步骤之后,我想对文件本身执行更多的块,但是我不知道这是怎么可能的.

After all steps have completed for each line in a file, I am wanting to execute further blocks on the file itself, but I don't know how this is possible.

通过TransformManyBlock拆分处理很简单,但是如何合并呢?
我习惯于Apache Camel的拆分器

It is straightforward to split processing via a TransformManyBlock, but how can one then consolidate?
I am used to Apache Camel's Splitter and Aggregator functionality - or there a fundamental disconnect between Dataflow's intent and my desired usage?

推荐答案

正确实现Splitter和Aggregator块将过于复杂,难以实现,而且使用起来也很麻烦.因此,我想出了一个更简单的API,该API封装了两个块,一个主块和一个细节块.每个块的处理选项是不同的.主模块执行拆分和汇总操作,而详细信息模块执行每个详细信息的转换.关于两个单独的选项集的唯一要求是,CancellationToken对于两个选项都必须相同.可以为每个块分别设置所有其他选项(MaxDegreeOfParallelismBoundedCapacityEnsureOrderedTaskScheduler等).

A proper implementation of a Splitter and an Aggregator block would be way too complex to implement, and too cumbersome to use. So I came up with a simpler API, that encapsulates two blocks, a master block and a detail block. The processing options for each block are different. The master block executes the splitting and the aggregating actions, while the detail block executes the transformation of each detail. The only requirement regarding the two separate sets of options is that the CancellationToken must be the same for both. All other options (MaxDegreeOfParallelism, BoundedCapacity, EnsureOrdered, TaskScheduler etc) can be set independently for each block.

public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, Task<IEnumerable<TDetail>>> split,
    Func<TDetail, Task<TDetailResult>> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    if (split == null) throw new ArgumentNullException(nameof(split));
    if (aggregate == null) throw new ArgumentNullException(nameof(aggregate));
    if (transformDetail == null)
        throw new ArgumentNullException(nameof(transformDetail));
    splitAggregateOptions = splitAggregateOptions ??
        new ExecutionDataflowBlockOptions();
    var cancellationToken = splitAggregateOptions.CancellationToken;
    transformDetailOptions = transformDetailOptions ??
        new ExecutionDataflowBlockOptions() { CancellationToken = cancellationToken };
    if (transformDetailOptions.CancellationToken != cancellationToken)
        throw new ArgumentException("Incompatible options", "CancellationToken");

    var detailTransformer = new ActionBlock<Task<Task<TDetailResult>>>(async task =>
    {
        try
        {
            task.RunSynchronously();
            await task.Unwrap().ConfigureAwait(false);
        }
        catch { } // Suppress exceptions (errors are propagated through the task)
    }, transformDetailOptions);

    return new TransformBlock<TInput, TOutput>(async item =>
    {
        IEnumerable<TDetail> details = await split(item); //continue on captured context
        TDetailResult[] detailResults = await Task.Run(async () =>
        {
            var tasks = new List<Task<TDetailResult>>();
            foreach (var detail in details)
            {
                var taskFactory = new Task<Task<TDetailResult>>(
                    () => transformDetail(detail), cancellationToken);
                var accepted = await detailTransformer.SendAsync(taskFactory,
                    cancellationToken).ConfigureAwait(false);
                if (!accepted)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    throw new InvalidOperationException("Unexpected detail rejection.");
                }
                var task = taskFactory.Unwrap();
                // Assume that the detailTransformer will never fail, and so the task
                // will eventually complete. Guarding against this unlikely scenario
                // with Task.WhenAny(task, detailTransformer.Completion) seems overkill.
                tasks.Add(task);
            }
            return await Task.WhenAll(tasks).ConfigureAwait(false);
        }); // continue on captured context
        return aggregate(item, detailResults);
    }, splitAggregateOptions);
}

// Overload with synchronous lambdas
public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, IEnumerable<TDetail>> split,
    Func<TDetail, TDetailResult> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    return CreateSplitterAggregatorBlock(
        item => Task.FromResult(split(item)),
        detail => Task.FromResult(transformDetail(detail)),
        aggregate, splitAggregateOptions, transformDetailOptions);
}

下面是此块的用法示例.输入是包含逗号分隔数字的字符串.将每个字符串分割,然后将每个数字加倍,最后将每个输入字符串的加倍数字相加.

Below is a usage example of this block. The input is strings containing comma-separated numbers. Each string is splitted, then each number is doubled, and finally the doubled numbers of each input string are summed.

var processor = CreateSplitterAggregatorBlock<string, int, int, int>(split: str =>
{
    var parts = str.Split(',');
    return parts.Select(part => Int32.Parse(part));
}, transformDetail: number =>
{
    return number * 2;
}, aggregate: (str, numbersArray) =>
{
    var sum = numbersArray.Sum();
    Console.WriteLine($"[{str}] => {sum}");
    return sum;
});

processor.Post("1, 2, 3");
processor.Post("4, 5");
processor.Post("6, 7, 8, 9");
processor.Complete();
processor.LinkTo(DataflowBlock.NullTarget<int>());
processor.Completion.Wait();

输出:

[1, 2, 3] => 12
[4, 5] => 18
[6, 7, 8, 9] => 60

这篇关于合并数据流结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆