使用TPL Dataflow块处理异常 [英] Handle exceptions with TPL Dataflow blocks

查看:53
本文介绍了使用TPL Dataflow块处理异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的tpl数据流,它基本上完成了一些任务.我注意到,在任何数据块中都存在异常时,它没有被最初的父块调用者捕获.我添加了一些手动代码来检查异常,但似乎不是正确的方法.

  if(readBlock.Completion.Exception!= null||saveBlockJoinedProcess.Completion.Exception!= null||processBlock1.Completion.Exception!=空||processBlock2.Completion.Exception!= null){抛出readBlock.Completion.Exception;} 

我在线查看了建议的方法,但没有发现明显的问题.因此,我在下面创建了一些示例代码,希望获得一些更好的解决方案的指导:

 使用系统;使用System.Collections.Generic;使用System.Linq;使用System.Text;使用System.Threading;使用System.Threading.Tasks;使用System.Threading.Tasks.Dataflow;命名空间TPLDataflow{班级计划{静态void Main(string [] args){尝试{//ProcessB();ProcessA();}捕获(异常e){Console.WriteLine(正在处理异常!");抛出新的Exception($"exception:{e}");}Console.WriteLine(处理完成!");Console.ReadLine();}私有静态void ProcessB(){Task.WhenAll(Task.Run(()=> DoSomething(1,"ProcessB")))).Wait();}私有静态无效ProcessA(){var random = new Random();var readBlock = new TransformBlock< int,int>(x =>{尝试{return DoSomething(x,"readBlock");}catch(Exception e){throw e;}});//1var braodcastBlock = new BroadcastBlock int(i => i);//⬅这里var processBlock1 = new TransformBlock< int,int>(x =>DoSomethingAsync(5,"processBlock1"));//2var processBlock2 = new TransformBlock< int,int>(x =>DoSomethingAsync(2,"processBlock2"));//3//var saveBlock =//新的ActionBlock< int>(//x =>保存(x));//4var saveBlockJoinedProcess =新的ActionBlock< Tuple< int,int>>(x =>SaveJoined(x.Item1,x.Item2));//4var saveBlockJoin = new JoinBlock< int,int>();readBlock.LinkTo(braodcastBlock,新的DataflowLinkOptions{PropagateCompletion = true));braodcastBlock.LinkTo(processBlock1,新的DataflowLinkOptions {PropagateCompletion = true});//5braodcastBlock.LinkTo(processBlock2,新的DataflowLinkOptions {PropagateCompletion = true});//6processBlock1.LinkTo(saveBlockJoin.Target1);//7processBlock2.LinkTo(saveBlockJoin.Target2);//8saveBlockJoin.LinkTo(saveBlockJoinedProcess,新的DataflowLinkOptions {PropagateCompletion = true});readBlock.Post(1);//10//readBlock.Post(2);//10Task.WhenAll(processBlock1.Completion,processBlock2.Completion).ContinueWith(_ => saveBlockJoin.Complete());readBlock.Complete();//12saveBlockJoinedProcess.Completion.Wait();//13如果(readBlock.Completion.Exception!= null||saveBlockJoinedProcess.Completion.Exception!= null||processBlock1.Completion.Exception!=空||processBlock2.Completion.Exception!= null){抛出readBlock.Completion.Exception;}}私有静态int DoSomething(int i,字符串方法){Console.WriteLine($"Do Something,callng method:{method}"));抛出新的Exception("Fake Exception!");返回我}私有静态异步Task< int>DoSomethingAsync(int i,字符串方法){Console.WriteLine($"Do SomethingAsync");抛出新的Exception("Fake Exception!");等待Task.Delay(new TimeSpan(0,0,i));Console.WriteLine($"Do Something:{i},callng method:{method}");返回我}私有静态无效Save(int x){Console.WriteLine("Save!");}私有静态无效SaveJoined(int x,int y){Thread.Sleep(new TimeSpan(0,0,10));Console.WriteLine(保存已加入!");}}} 

解决方案

我在线查看了建议的方法,但是没有发现明显的问题.

如果有管道(或多或少),那么通常的方法是使用 PropagateCompletion 关闭管道.如果您有更复杂的拓扑,则需要手动完成块.

在您的情况下,您尝试在此处进行传播:

  Task.WhenAll(processBlock1.Completion,processBlock2.Completion).ContinueWith(_ => saveBlockJoin.Complete()); 

但是此代码不会传播异常.当同时完成 processBlock1.Completion processBlock2.Completion 时, saveBlockJoin 成功完成.

更好的解决方案是使用 await 而不是 ContinueWith :

 异步任务PropagateToSaveBlockJoin(){尝试{等待Task.WhenAll(processBlock1.Completion,processBlock2.Completion);saveBlockJoin.Complete();}抓住(前例外){(((IDataflowBlock)saveBlockJoin).Fault(ex);}}_ = PropagateToSaveBlockJoin(); 

使用 await 鼓励您处理异常,您可以通过将异常传递给 Fault 来传播异常来实现.

I have a simple tpl data flow which basically does some tasks. I noticed when there is an exception in any of the datablocks, it wasn't getting caught in the initial parent block caller. I have added some manual code to check for exception but doesn't seem the right approach.

if (readBlock.Completion.Exception != null
    || saveBlockJoinedProcess.Completion.Exception != null
    || processBlock1.Completion.Exception != null
    || processBlock2.Completion.Exception != null)
{
    throw readBlock.Completion.Exception;
}

I had a look online to see what's a suggested approach but didn't see anything obvious. So I created some sample code below and was hoping to get some guidance on a better solution:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace TPLDataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //ProcessB();
                ProcessA();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception in Process!");
                throw new Exception($"exception:{e}");
            }
            Console.WriteLine("Processing complete!");
            Console.ReadLine();
        }

        private static void ProcessB()
        {
            Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
        }

        private static void ProcessA()
        {
            var random = new Random();
            var readBlock = new TransformBlock<int, int>(x =>
            {
                try { return DoSomething(x, "readBlock"); }
                catch (Exception e) { throw e; }
            }); //1

            var braodcastBlock = new BroadcastBlock<int>(i => i); // ⬅ Here

            var processBlock1 = new TransformBlock<int, int>(x =>
                DoSomethingAsync(5, "processBlock1")); //2
            var processBlock2 = new TransformBlock<int, int>(x =>
                DoSomethingAsync(2, "processBlock2")); //3

            //var saveBlock =
            //    new ActionBlock<int>(
            //    x => Save(x)); //4

            var saveBlockJoinedProcess =
                new ActionBlock<Tuple<int, int>>(
                x => SaveJoined(x.Item1, x.Item2)); //4

            var saveBlockJoin = new JoinBlock<int, int>();

            readBlock.LinkTo(braodcastBlock, new DataflowLinkOptions
                { PropagateCompletion = true });

            braodcastBlock.LinkTo(processBlock1,
                new DataflowLinkOptions { PropagateCompletion = true }); //5

            braodcastBlock.LinkTo(processBlock2,
                new DataflowLinkOptions { PropagateCompletion = true }); //6


            processBlock1.LinkTo(
                saveBlockJoin.Target1); //7

            processBlock2.LinkTo(
                saveBlockJoin.Target2); //8

            saveBlockJoin.LinkTo(saveBlockJoinedProcess,
                new DataflowLinkOptions { PropagateCompletion = true });

            readBlock.Post(1); //10
                               //readBlock.Post(2); //10

            Task.WhenAll(processBlock1.Completion,processBlock2.Completion)
                .ContinueWith(_ => saveBlockJoin.Complete());

            readBlock.Complete(); //12
            saveBlockJoinedProcess.Completion.Wait(); //13
            if (readBlock.Completion.Exception != null
                || saveBlockJoinedProcess.Completion.Exception != null
                || processBlock1.Completion.Exception != null
                || processBlock2.Completion.Exception != null)
            {
                throw readBlock.Completion.Exception;
            }
        }
        private static int DoSomething(int i, string method)
        {
            Console.WriteLine($"Do Something, callng method : { method}");
            throw new Exception("Fake Exception!");
            return i;
        }
        private static async Task<int> DoSomethingAsync(int i, string method)
        {
            Console.WriteLine($"Do SomethingAsync");
            throw new Exception("Fake Exception!");
            await Task.Delay(new TimeSpan(0, 0, i));
            Console.WriteLine($"Do Something : {i}, callng method : { method}");
            return i;
        }
        private static void Save(int x)
        {

            Console.WriteLine("Save!");
        }
        private static void SaveJoined(int x, int y)
        {
            Thread.Sleep(new TimeSpan(0, 0, 10));
            Console.WriteLine("Save Joined!");
        }
    }
}

解决方案

I had a look online to see what's a suggested approach but didn't see anything obvious.

If you have a pipeline (more or less), then the common approach is to use PropagateCompletion to shut down the pipe. If you have more complex topologies, then you would need to complete blocks by hand.

In your case, you have an attempted propagation here:

Task.WhenAll(
    processBlock1.Completion,
    processBlock2.Completion)
    .ContinueWith(_ => saveBlockJoin.Complete());

But this code will not propagate exceptions. When both processBlock1.Completion and processBlock2.Completion complete, saveBlockJoin is completed successfully.

A better solution would be to use await instead of ContinueWith:

async Task PropagateToSaveBlockJoin()
{
    try
    {
        await Task.WhenAll(processBlock1.Completion, processBlock2.Completion);
        saveBlockJoin.Complete();
    }
    catch (Exception ex)
    {
        ((IDataflowBlock)saveBlockJoin).Fault(ex);
    }
}
_ = PropagateToSaveBlockJoin();

Using await encourages you to handle exceptions, which you can do by passing them to Fault to propagate the exception.

这篇关于使用TPL Dataflow块处理异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆