使用TPL Dataflow块处理异常 [英] Handle exceptions with TPL Dataflow blocks
问题描述
我有一个简单的tpl数据流,它基本上完成了一些任务.我注意到,在任何数据块中都存在异常时,它没有被最初的父块调用者捕获.我添加了一些手动代码来检查异常,但似乎不是正确的方法.
if(readBlock.Completion.Exception!= null||saveBlockJoinedProcess.Completion.Exception!= null||processBlock1.Completion.Exception!=空||processBlock2.Completion.Exception!= null){抛出readBlock.Completion.Exception;}
我在线查看了建议的方法,但没有发现明显的问题.因此,我在下面创建了一些示例代码,希望获得一些更好的解决方案的指导:
使用系统;使用System.Collections.Generic;使用System.Linq;使用System.Text;使用System.Threading;使用System.Threading.Tasks;使用System.Threading.Tasks.Dataflow;命名空间TPLDataflow{班级计划{静态void Main(string [] args){尝试{//ProcessB();ProcessA();}捕获(异常e){Console.WriteLine(正在处理异常!");抛出新的Exception($"exception:{e}");}Console.WriteLine(处理完成!");Console.ReadLine();}私有静态void ProcessB(){Task.WhenAll(Task.Run(()=> DoSomething(1,"ProcessB")))).Wait();}私有静态无效ProcessA(){var random = new Random();var readBlock = new TransformBlock< int,int>(x =>{尝试{return DoSomething(x,"readBlock");}catch(Exception e){throw e;}});//1var braodcastBlock = new BroadcastBlock int(i => i);//⬅这里var processBlock1 = new TransformBlock< int,int>(x =>DoSomethingAsync(5,"processBlock1"));//2var processBlock2 = new TransformBlock< int,int>(x =>DoSomethingAsync(2,"processBlock2"));//3//var saveBlock =//新的ActionBlock< int>(//x =>保存(x));//4var saveBlockJoinedProcess =新的ActionBlock< Tuple< int,int>>(x =>SaveJoined(x.Item1,x.Item2));//4var saveBlockJoin = new JoinBlock< int,int>();readBlock.LinkTo(braodcastBlock,新的DataflowLinkOptions{PropagateCompletion = true));braodcastBlock.LinkTo(processBlock1,新的DataflowLinkOptions {PropagateCompletion = true});//5braodcastBlock.LinkTo(processBlock2,新的DataflowLinkOptions {PropagateCompletion = true});//6processBlock1.LinkTo(saveBlockJoin.Target1);//7processBlock2.LinkTo(saveBlockJoin.Target2);//8saveBlockJoin.LinkTo(saveBlockJoinedProcess,新的DataflowLinkOptions {PropagateCompletion = true});readBlock.Post(1);//10//readBlock.Post(2);//10Task.WhenAll(processBlock1.Completion,processBlock2.Completion).ContinueWith(_ => saveBlockJoin.Complete());readBlock.Complete();//12saveBlockJoinedProcess.Completion.Wait();//13如果(readBlock.Completion.Exception!= null||saveBlockJoinedProcess.Completion.Exception!= null||processBlock1.Completion.Exception!=空||processBlock2.Completion.Exception!= null){抛出readBlock.Completion.Exception;}}私有静态int DoSomething(int i,字符串方法){Console.WriteLine($"Do Something,callng method:{method}"));抛出新的Exception("Fake Exception!");返回我}私有静态异步Task< int>DoSomethingAsync(int i,字符串方法){Console.WriteLine($"Do SomethingAsync");抛出新的Exception("Fake Exception!");等待Task.Delay(new TimeSpan(0,0,i));Console.WriteLine($"Do Something:{i},callng method:{method}");返回我}私有静态无效Save(int x){Console.WriteLine("Save!");}私有静态无效SaveJoined(int x,int y){Thread.Sleep(new TimeSpan(0,0,10));Console.WriteLine(保存已加入!");}}}
我在线查看了建议的方法,但是没有发现明显的问题.
如果有管道(或多或少),那么通常的方法是使用 PropagateCompletion
关闭管道.如果您有更复杂的拓扑,则需要手动完成块.
在您的情况下,您尝试在此处进行传播:
Task.WhenAll(processBlock1.Completion,processBlock2.Completion).ContinueWith(_ => saveBlockJoin.Complete());
但是此代码不会传播异常.当同时完成 更好的解决方案是使用 使用 I have a simple tpl data flow which basically does some tasks.
I noticed when there is an exception in any of the datablocks, it wasn't getting caught in the initial parent block caller.
I have added some manual code to check for exception but doesn't seem the right approach. I had a look online to see what's a suggested approach but didn't see anything obvious.
So I created some sample code below and was hoping to get some guidance on a better solution:
I had a look online to see what's a suggested approach but didn't see anything obvious. If you have a pipeline (more or less), then the common approach is to use In your case, you have an attempted propagation here: But this code will not propagate exceptions. When both A better solution would be to use Using 这篇关于使用TPL Dataflow块处理异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋! processBlock1.Completion
和 processBlock2.Completion
时, saveBlockJoin 成功完成. await
而不是 ContinueWith
:
异步任务PropagateToSaveBlockJoin(){尝试{等待Task.WhenAll(processBlock1.Completion,processBlock2.Completion);saveBlockJoin.Complete();}抓住(前例外){(((IDataflowBlock)saveBlockJoin).Fault(ex);}}_ = PropagateToSaveBlockJoin();
await
鼓励您处理异常,您可以通过将异常传递给 Fault
来传播异常来实现.if (readBlock.Completion.Exception != null
|| saveBlockJoinedProcess.Completion.Exception != null
|| processBlock1.Completion.Exception != null
|| processBlock2.Completion.Exception != null)
{
throw readBlock.Completion.Exception;
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace TPLDataflow
{
class Program
{
static void Main(string[] args)
{
try
{
//ProcessB();
ProcessA();
}
catch (Exception e)
{
Console.WriteLine("Exception in Process!");
throw new Exception($"exception:{e}");
}
Console.WriteLine("Processing complete!");
Console.ReadLine();
}
private static void ProcessB()
{
Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
}
private static void ProcessA()
{
var random = new Random();
var readBlock = new TransformBlock<int, int>(x =>
{
try { return DoSomething(x, "readBlock"); }
catch (Exception e) { throw e; }
}); //1
var braodcastBlock = new BroadcastBlock<int>(i => i); // ⬅ Here
var processBlock1 = new TransformBlock<int, int>(x =>
DoSomethingAsync(5, "processBlock1")); //2
var processBlock2 = new TransformBlock<int, int>(x =>
DoSomethingAsync(2, "processBlock2")); //3
//var saveBlock =
// new ActionBlock<int>(
// x => Save(x)); //4
var saveBlockJoinedProcess =
new ActionBlock<Tuple<int, int>>(
x => SaveJoined(x.Item1, x.Item2)); //4
var saveBlockJoin = new JoinBlock<int, int>();
readBlock.LinkTo(braodcastBlock, new DataflowLinkOptions
{ PropagateCompletion = true });
braodcastBlock.LinkTo(processBlock1,
new DataflowLinkOptions { PropagateCompletion = true }); //5
braodcastBlock.LinkTo(processBlock2,
new DataflowLinkOptions { PropagateCompletion = true }); //6
processBlock1.LinkTo(
saveBlockJoin.Target1); //7
processBlock2.LinkTo(
saveBlockJoin.Target2); //8
saveBlockJoin.LinkTo(saveBlockJoinedProcess,
new DataflowLinkOptions { PropagateCompletion = true });
readBlock.Post(1); //10
//readBlock.Post(2); //10
Task.WhenAll(processBlock1.Completion,processBlock2.Completion)
.ContinueWith(_ => saveBlockJoin.Complete());
readBlock.Complete(); //12
saveBlockJoinedProcess.Completion.Wait(); //13
if (readBlock.Completion.Exception != null
|| saveBlockJoinedProcess.Completion.Exception != null
|| processBlock1.Completion.Exception != null
|| processBlock2.Completion.Exception != null)
{
throw readBlock.Completion.Exception;
}
}
private static int DoSomething(int i, string method)
{
Console.WriteLine($"Do Something, callng method : { method}");
throw new Exception("Fake Exception!");
return i;
}
private static async Task<int> DoSomethingAsync(int i, string method)
{
Console.WriteLine($"Do SomethingAsync");
throw new Exception("Fake Exception!");
await Task.Delay(new TimeSpan(0, 0, i));
Console.WriteLine($"Do Something : {i}, callng method : { method}");
return i;
}
private static void Save(int x)
{
Console.WriteLine("Save!");
}
private static void SaveJoined(int x, int y)
{
Thread.Sleep(new TimeSpan(0, 0, 10));
Console.WriteLine("Save Joined!");
}
}
}
PropagateCompletion
to shut down the pipe. If you have more complex topologies, then you would need to complete blocks by hand.Task.WhenAll(
processBlock1.Completion,
processBlock2.Completion)
.ContinueWith(_ => saveBlockJoin.Complete());
processBlock1.Completion
and processBlock2.Completion
complete, saveBlockJoin
is completed successfully.await
instead of ContinueWith
:async Task PropagateToSaveBlockJoin()
{
try
{
await Task.WhenAll(processBlock1.Completion, processBlock2.Completion);
saveBlockJoin.Complete();
}
catch (Exception ex)
{
((IDataflowBlock)saveBlockJoin).Fault(ex);
}
}
_ = PropagateToSaveBlockJoin();
await
encourages you to handle exceptions, which you can do by passing them to Fault
to propagate the exception.