合并数据流结果 [英] Combining dataflow results

查看:34
本文介绍了合并数据流结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个 Dataflow 管道,它读取一组文件,并针对每个文件中的每一行执行一系列 Dataflow 块.

I am develop a Dataflow pipeline which reads a collection of files and, for each line in each file, performs a series of Dataflow blocks.

在为文件中的每一行完成所有步骤后,我想对文件本身执行更多块,但我不知道这怎么可能.

After all steps have completed for each line in a file, I am wanting to execute further blocks on the file itself, but I don't know how this is possible.

通过 TransformManyBlock 拆分处理很简单,但是如何合并?
我习惯了 Apache Camel 的 SplitterAggregator 功能 - 或者 Dataflow 的意图与我想要的用途之间存在根本性的脱节?

It is straightforward to split processing via a TransformManyBlock, but how can one then consolidate?
I am used to Apache Camel's Splitter and Aggregator functionality - or there a fundamental disconnect between Dataflow's intent and my desired usage?

推荐答案

Splitter 和 Aggregator 块的正确实现方式太复杂而难以实现,而且使用起来也太麻烦.所以我想出了一个更简单的 API,它封装了两个块,一个主块和一个细节块.每个块的处理选项是不同的.主块执行拆分和聚合操作,而细节块执行每个细节的转换.关于这两组单独的选项的唯一要求是 CancellationToken 对两者必须相同.可以为每个块独立设置所有其他选项(MaxDegreeOfParallelismBoundedCapacityEnsureOrderedTaskScheduler 等).

A proper implementation of a Splitter and an Aggregator block would be way too complex to implement, and too cumbersome to use. So I came up with a simpler API, that encapsulates two blocks, a master block and a detail block. The processing options for each block are different. The master block executes the splitting and the aggregating actions, while the detail block executes the transformation of each detail. The only requirement regarding the two separate sets of options is that the CancellationToken must be the same for both. All other options (MaxDegreeOfParallelism, BoundedCapacity, EnsureOrdered, TaskScheduler etc) can be set independently for each block.

public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, Task<IEnumerable<TDetail>>> split,
    Func<TDetail, Task<TDetailResult>> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    if (split == null) throw new ArgumentNullException(nameof(split));
    if (aggregate == null) throw new ArgumentNullException(nameof(aggregate));
    if (transformDetail == null)
        throw new ArgumentNullException(nameof(transformDetail));
    splitAggregateOptions = splitAggregateOptions ??
        new ExecutionDataflowBlockOptions();
    var cancellationToken = splitAggregateOptions.CancellationToken;
    transformDetailOptions = transformDetailOptions ??
        new ExecutionDataflowBlockOptions() { CancellationToken = cancellationToken };
    if (transformDetailOptions.CancellationToken != cancellationToken)
        throw new ArgumentException("Incompatible options", "CancellationToken");

    var detailTransformer = new ActionBlock<Task<Task<TDetailResult>>>(async task =>
    {
        try
        {
            task.RunSynchronously();
            await task.Unwrap().ConfigureAwait(false);
        }
        catch { } // Suppress exceptions (errors are propagated through the task)
    }, transformDetailOptions);

    return new TransformBlock<TInput, TOutput>(async item =>
    {
        IEnumerable<TDetail> details = await split(item); //continue on captured context
        TDetailResult[] detailResults = await Task.Run(async () =>
        {
            var tasks = new List<Task<TDetailResult>>();
            foreach (var detail in details)
            {
                var taskFactory = new Task<Task<TDetailResult>>(
                    () => transformDetail(detail), cancellationToken);
                var accepted = await detailTransformer.SendAsync(taskFactory,
                    cancellationToken).ConfigureAwait(false);
                if (!accepted)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    throw new InvalidOperationException("Unexpected detail rejection.");
                }
                var task = taskFactory.Unwrap();
                // Assume that the detailTransformer will never fail, and so the task
                // will eventually complete. Guarding against this unlikely scenario
                // with Task.WhenAny(task, detailTransformer.Completion) seems overkill.
                tasks.Add(task);
            }
            return await Task.WhenAll(tasks).ConfigureAwait(false);
        }); // continue on captured context
        return aggregate(item, detailResults);
    }, splitAggregateOptions);
}

// Overload with synchronous lambdas
public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, IEnumerable<TDetail>> split,
    Func<TDetail, TDetailResult> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    return CreateSplitterAggregatorBlock(
        item => Task.FromResult(split(item)),
        detail => Task.FromResult(transformDetail(detail)),
        aggregate, splitAggregateOptions, transformDetailOptions);
}

下面是这个块的使用示例.输入是包含逗号分隔数字的字符串.将每个字符串拆分,然后将每个数字加倍,最后将每个输入字符串的加倍数字相加.

Below is a usage example of this block. The input is strings containing comma-separated numbers. Each string is splitted, then each number is doubled, and finally the doubled numbers of each input string are summed.

var processor = CreateSplitterAggregatorBlock<string, int, int, int>(split: str =>
{
    var parts = str.Split(',');
    return parts.Select(part => Int32.Parse(part));
}, transformDetail: number =>
{
    return number * 2;
}, aggregate: (str, numbersArray) =>
{
    var sum = numbersArray.Sum();
    Console.WriteLine($"[{str}] => {sum}");
    return sum;
});

processor.Post("1, 2, 3");
processor.Post("4, 5");
processor.Post("6, 7, 8, 9");
processor.Complete();
processor.LinkTo(DataflowBlock.NullTarget<int>());
processor.Completion.Wait();

输出:

[1, 2, 3] => 12
[4, 5] => 18
[6, 7, 8, 9] => 60

这篇关于合并数据流结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆