具有限制容量的转换块中的TPL数据流异常 [英] TPL Dataflow exception in transform block with bounded capacity

查看:57
本文介绍了具有限制容量的转换块中的TPL数据流异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要构建将处理大量消息的TPL数据流管道.因为有很多消息,所以我不能简单地将它们 Post 放入 BufferBlock 的无限队列中,否则我将面临内存问题.所以我想使用 BoundedCapacity = 1 选项禁用队列,并使用 MaxDegreeOfParallelism 进行并行任务处理,因为我的 TransformBlock 可能需要一些时间每条消息.我还使用 PropagateCompletion 完成所有操作,并且无法沿管道传播.

I need to construct TPL dataflow pipeline which will process a lot of messages. Because there are many messages I can not simply Post them into infinite queue of the BufferBlock or I will face memory issues. So I want to use BoundedCapacity = 1 option to disable the queue and use MaxDegreeOfParallelism to use parallel task processing since my TransformBlocks could take some time for each message. I also use PropagateCompletion to make all completion and fail to propagate down the pipeline.

但是当第一条消息刚发生错误时,我正面临错误处理的问题:调用 await SendAsync 只是将我的应用切换为无限等待状态.

But I'm facing the issue with error handling when error happened just right after the first message: calling await SendAsync simply switch my app into infinite waiting.

我已将案例简化为示例控制台应用程序:

I've simplified my case to sample console app:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1
});

var process_block = new ActionBlock<int>(x =>
{
    throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
});

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });

for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: {0}", k);
}

data_buffer.Complete();

await process_block.Completion;

推荐答案

这是预期的行为.如果出现下游"故障,则该错误不会在网格上向后"传播.网格期望您检测到该故障(例如,通过 process_block.Completion )并解决该问题.

This is expected behavior. If there's a fault "downstream", the error does not propagate "backwards" up the mesh. The mesh is expecting you to detect that fault (e.g., via process_block.Completion) and resolve it.

如果您想向后传播错误,则可以在 process_block.Completion 上继续 await

If you want to propagate errors backwards, you could have an await or continuation on process_block.Completion that faults the upstream block(s) if the downstream block(s) fault.

请注意,这不是唯一可行的解​​决方案.您可能需要重建网格的该部分或将源链接到替代目标.源块没有故障,因此它们可以继续使用已修复的网格进行处理.

Note that this is not the only possible solution; you may want to rebuild that part of the mesh or link the sources to an alternative target. The source block(s) have not faulted, so they can just continue processing with a repaired mesh.

这篇关于具有限制容量的转换块中的TPL数据流异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆