TPL数据流和下游块中的异常处理 [英] TPL Dataflow and exception handling in downstream blocks

查看:306
本文介绍了TPL数据流和下游块中的异常处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下伪代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>(
    async item =>
    {
        var processingTasks = targets.Select(async t =>
        {
            try
            {
                // This is condition is always false
                // t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
                if (!await t.SendAsync(item))
                    await t.Completion;
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Handled exception : " + e.Message);
            }
        });

        try
        {
            // Neither here the exception is rethrowed
            await Task.WhenAll(processingTasks);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
        }
    });

for (var i = 1; i <= 10; i++)
{
    broadcaster.Post(i);
}

管道配置如下 ActionBlock< int> => BufferBlock< INT> => ActionBlock< INT>

最后一个 ActionBlock< int> 引发异常,但不会重新导入源代码块,

The last ActionBlock<int> throws an exception but it is not rethrown to source block where I would like to handle it.

这个代码如何重写,以便正确处理异常?

How this code can be rewritten so it handles exceptions correctly?

推荐答案

您可以找到此主题的官方指南这里。整体解决方案是订阅所有块 完成 任务,检查其状态,并在需要的情况下更换故障块(应存储块的所有引用) 。

You can find the official guidelines for this topic here. Overall solution is to subscribe for all the blocks Completion task with checking the state of it, and, in case of need, replacing the faulted block (one should store all the references for the blocks too). Please refer to whole article to more information.


具有故障的网络的行为

Behaviors of a network with Faulted blocks


  1. 保留的邮件

    为了避免邮件损坏,故障块应清除其消息队列,并尽快转入故障状态
    。有一种情况不符合
    这个规则:持有由目标保留的消息的源块
    如果遇到内部异常的块有一条消息,
    由目标保留,那么保留的消息不能是
    丢弃
    ,块不应该被移动到故障状态
    ,直到消息被释放或消耗。

  1. Reserved Messages
    In order to avoid message corruption, a faulted block should clear its message queues and move into a Faulted state as soon as possible. There is a single scenario that does not obey to this rule: a source block holding a message reserved by a target. If a block that encounters an internal exception has a message that was reserved by a target, the reserved message must not be dropped, and the block should not be moved into the Faulted state until the message is released or consumed.

挂起网络

...

Hanging Networks
...


  • 保留对网络中所有块的引用,并使用 Task.WaitAll Task.WhenAll 等待他们(同步或
    异步)。如果某个程序段出现故障,它的 完成 任务将
    完成故障状态。

  • 使用 DataflowLinkOptions PropagateCompletion == true 建立线性网络时。这将块块完成从
    源扩展到目标。在这种情况下,等待网络
    叶块就足够了。

  • Keep a reference to all the blocks in the network and use Task.WaitAll or Task.WhenAll to wait for them (synchronously or asynchronously). If a block faults, its Completion task will complete in the Faulted state.
  • Use DataflowLinkOptions with PropagateCompletion == true when building a linear network. That will propagate block completion from source to target. In this case it is enough to wait on the network leaf block.


这篇关于TPL数据流和下游块中的异常处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆