如何发出数据流完成的信号? [英] How do I signal completion of my dataflow?

查看:31
本文介绍了如何发出数据流完成的信号?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 TPL 数据流实现由 3 个步骤组成的数据流的类.

I've got a class the implements a dataflow composed of 3 steps using TPL Dataflow.

在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo 将它们链接起来,DataflowLinkOptions.PropagateCompletion 设置为 true.该类公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流.该方法返回工作流最后一步的完成"属性.

In the constructor I create the steps as TransformBlocks and link them up using LinkTo with DataflowLinkOptions.PropagateCompletion set to true. The class exposes a single method which kicks of the workflow by calling SendAsync on the 1st step. The method returns the "Completion" property of the final step of the workflow.

目前,工作流中的步骤似乎按预期执行,但最后一步永远不会完成,除非我明确调用 Complete.但是这样做会缩短工作流程并且没有执行任何步骤?我做错了什么?

At the moment the steps in the workflow appear to execute as expected but final step never completes unless I explicitly call Complete on it. But doing that short-circuits the workflow and none of the steps are executed? What am I doing wrong?

public class MessagePipeline {
   private TransformBlock<object, object> step1;
   private TransformBlock<object, object> step2;
   private TransformBlock<object, object> step3;

   public MessagePipeline() {
      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
      step1 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step1...");
        return x;
      });
      step2 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step2...");
        return x;
      });
      step3 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step3...");
        return x;
      });

      step1.LinkTo(step2, linkOptions);
      step2.LinkTo(step3, linkOptions);
   }

   public Task Push(object message) {
      step1.SendAsync(message);
      step1.Complete();
      return step3.Completion;
   }
}
...
public class Program {
  public static void Main(string[] args) {
    var pipeline = new MessagePipeline();
    var result = pipeline.Push("Hello, world!");
    result.ContinueWith(_ => Console.WriteLine("Completed"));
    Console.ReadLine();
  }
}

推荐答案

当您链接步骤时,您需要通过 PropagateCompletion 属性设置为 true 以传播完成和错误.一旦你这样做了,在第一个块上调用 Complete() 会将完成传播到下游块.

When you link the steps, you need to pass a DataflowLinkOptions with the the PropagateCompletion property set to true to propagate both completion and errors. Once you do that, calling Complete() on the first block will propagete completion to downstream blocks.

一旦块收到完成事件,它就会完成处理,然后通知其链接的下游目标.

Once a block receives the completion event, it finishes processing then notifies its linked downstream targets.

通过这种方式,您可以将所有数据发布到第一步并调用 Complete().只有当所有上游块都完成时,最后一个块才会完成.

This way you can post all your data to the first step and call Complete(). The final block will only complete when all upstream blocks have completed.

例如

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);

foreach(var message in messages)
{
    myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;

PropagateCompletion 默认情况下不是 true,因为在更复杂的场景中(例如非线性流或动态变化的流),您不希望完成和错误自动传播.如果您想在不终止整个流程的情况下处理错误,您可能还想避免自动完成.

PropagateCompletion isn't true by default because in more complex scenarios (eg non-linear flows, or dynamically changing flows) you don't want completion and errors to propagate automatically. You may also want to avoid automatic completion if you want to handle errors without terminating the entire flow.

回到 TPL Dataflow 处于测试阶段时,默认 true,但这在 RTM 上发生了变化

Way back when TPL Dataflow was in beta the default was true but this was changed on RTM

更新

代码永远不会完成,因为最后一步是一个 TransformBlock,没有链接目标来接收其输出.这意味着即使块收到了完成信号,它也没有完成所有的工作,不能改变自己的完成状态.

The code never completes because the final step is a TransformBlock with no linked target to receive its output. This means that even though the block received the completion signal, it hasn't finished all its work and can't change its own Completion status.

将其更改为 ActionBlock 可消除该问题.

Changing it to an ActionBlock<object> removes the issue.

这篇关于如何发出数据流完成的信号?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆