如何等待物料通过管道? [英] How to wait until item goes through pipeline?

查看:30
本文介绍了如何等待物料通过管道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我正在努力把头放在Microsoft的Dataflow库上.我建立了一个非常简单的管道,仅包含两个块:

So, I'm trying to wrap my head around Microsoft's Dataflow library. I've built a very simple pipeline consisting of just two blocks:

var start = new TransformBlock<Foo, Bar>();
var end = new ActionBlock<Bar>();
start.LinkTo(end);

现在,我可以通过调用以下方式异步处理 Foo 实例:

Now I can asynchronously process Foo instances by calling:

start.SendAsync(new Foo());

我不了解的是在需要时如何同步进行处理.我认为等待 SendAsync 就足够了:

What I do not understand is how to do the processing synchronously, when needed. I thought that waiting on SendAsync would be enough:

start.SendAsync(new Foo()).Wait();

但是显然,它会在管道中的第一个处理器接受项目后立即返回,而不是在项目被完全处理后返回.那么有没有办法等到给定的项目被最后一个( end )块处理?除了通过整个管道传递 WaitHandle .

But apparently it returns as soon as item is accepted by first processor in pipeline, and not when item is fully processed. So is there a way to wait until given item was processed by last (end) block? Apart from passing a WaitHandle through entire pipeline.

推荐答案

简而言之,不支持在数据流中立即使用.本质上,您需要做的是标记数据,以便在处理完成后可以检索它.我已经写出了一种方法来执行此操作,让消费者 await Job 随管道进行处理.管道设计的唯一让步是每个块都使用 KeyValuePair< Guid,T> .这是基本的 JobManager 该帖子 是我写的.请注意,帖子中的代码有些陈旧,需要一些更新,但是它应该使您朝着正确的方向前进.

In short that's not supported out of the box in data flow. Essentially what you need to do is to tag the data so you can retrieve it when processing is done. I've written up a way to do this that let's the consumer await a Job as it gets processed by the pipeline. The only concession to pipeline design is that each block take a KeyValuePair<Guid, T>. This is the basic JobManager and the post I wrote about it. Note the code in the post is a bit dated and needs some updates but it should get you in the right direction.

namespace ConcurrentFlows.DataflowJobs {
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;

    /// <summary>
    /// A generic interface defining that:
    /// for a specified input type => an awaitable result is produced.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public interface IJobManager<TInput, TOutput> {
        Task<TOutput> SubmitRequest(TInput data);
    }

    /// <summary>
    /// A TPL-Dataflow based job manager.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {

        /// <summary>
        /// It is anticipated that jobHandler is an injected
        /// singleton instance of a Dataflow based 'calculator', though this implementation
        /// does not depend on it being a singleton.
        /// </summary>
        /// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
        public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
            if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }

            this.JobHandler = JobHandler;
            if (!alreadyLinked) {
                JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
                alreadyLinked = true;
            }
        }

        private static bool alreadyLinked = false;            

        /// <summary>
        /// Submits the request to the JobHandler and asynchronously awaits the result.
        /// </summary>
        /// <param name="data">The input data to be processd.</param>
        /// <returns></returns>
        public async Task<TOutput> SubmitRequest(TInput data) {
            var taggedData = TagInputData(data);
            var job = CreateJob(taggedData);
            Jobs.TryAdd(job.Key, job.Value);
            await JobHandler.SendAsync(taggedData);
            return await job.Value.Task;
        }

        private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
            get;
        } = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();

        private static ExecutionDataflowBlockOptions Options {
            get;
        } = GetResultHandlerOptions();

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
            get;
        } = CreateReplyHandler(Options);

        private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
            get;
        }

        private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
            var id = Guid.NewGuid();
            return new KeyValuePair<Guid, TInput>(id, data);
        }

        private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
            var id = taggedData.Key;
            var jobCompletionSource = new TaskCompletionSource<TOutput>();
            return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
        }

        private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
            return new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 1000
            };
        }

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
            return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
                RecieveOutput(result);
            }, options);
        }

        private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
            var jobId = result.Key;
            TaskCompletionSource<TOutput> jobCompletionSource;
            if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
                throw new InvalidOperationException($"The jobId: {jobId} was not found.");
            }
            var resultValue = result.Value;
            jobCompletionSource.SetResult(resultValue);            
        }
    }
}

这篇关于如何等待物料通过管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆