TPL 完成 vs 完成 [英] TPL Complete vs Completion
问题描述
我需要从旧数据库中导入与客户相关的数据,并在此过程中执行多项转换.这意味着单个条目需要执行额外的事件"(同步产品、创建发票等).
I need to import customer related data from legacy DB and perform several transformations during the process. This means a single entry needs to perform additional "events" (synchronize products, create invoices, etc.).
我最初的解决方案是一个简单的并行方法.它工作正常,但有时它有问题.如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个底层事件也失败(它们取决于失败的事件).它不会一直发生,但很烦人.
My initial solution was a simple parallel approach. It works okay, but sometimes it has issues. If the currently processed customers need to wait for the same type of events, their processing queues might got stuck and eventually time out, causing every underlying events to fail too (they depend on the one which failed). It doesn't happen all the time, yet it's annoying.
所以我有了另一个想法,分批工作.我的意思是不仅要限制同时处理的客户数量,还要限制广播到队列的事件数量.在四处寻找想法时,我发现了 this 答案,它指向 TPL 数据流.
So I got another idea, work in batches. I mean not only limiting the number of customers being processed at the same time, but also the number of the events which are broadcasted to the queues. While searching around for ideas, I found this answer, which points to the TPL DataFlow.
我做了一个骨架来熟悉它.我设置了一个简单的管道,但我对 Complete()
的用法和等待 Completion()
的用法有点困惑.
I made a skeleton to get familiar with it. I set up a simple pipeline, but I'm a bit confused about the usage of Complete()
and awaiting Completion()
.
步骤如下
- 制作一个数字列表(要导入的客户的 ID) - 这在导入逻辑之外,它只是为了能够触发其余的逻辑
- 创建一个
BatchBlock
(能够限制同时处理的客户数量) - 根据 id 创建单个
MyClass1
项目 (TransformBlock
) - 执行一些逻辑并生成
MyClass2
(TransformManyBlock
)的集合——例如,睡眠1秒 - 对集合的每个项目执行一些逻辑 (
ActionBlock
) - 例如,睡眠 1 秒
- Make a list of numbers (the ids of the customers to be imported) - this is outside the import logic, it just there to be able to trigger the rest of the logic
- Create a
BatchBlock
(to be able to limit the number of customers to be processed at the same time) - Create a single
MyClass1
item based on the id (TransformBlock<int, MyClass1>
) - Perform some logic and generate a collection of
MyClass2
(TransformManyBlock<MyClass1, MyClass2>
) - as example, sleep for 1 second - Perform some logic on every item of the collection (
ActionBlock<MyClass2>
) - as example, sleep for 1 second
完整代码如下:
public static class Program
{
private static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(2);
for (var i = 1; i < 10; i++)
{
batchBlock.Post(i);
}
batchBlock.Complete();
while (batchBlock.TryReceive(null, out var ids))
{
var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
{
Console.WriteLine($"TransformBlock(id: {id})");
return new MyClass1(id, "Star Wars");
});
var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
{
Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
Thread.Sleep(1000);
return GetMyClass22Values(myClass1);
});
var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
{
Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
Thread.Sleep(1000);
});
transformBlock.LinkTo(transformManyBlock);
transformManyBlock.LinkTo(actionBlock);
foreach (var id in ids)
{
transformBlock.Post(id);
}
// this is the point when I'm not 100% sure
//transformBlock.Complete();
//transformManyBlock.Complete();
//transformManyBlock.Completion.Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();
}
Console.WriteLine();
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
{
return new List<MyClass2>
{
new MyClass2(1, myClass1.Id+ " did this"),
new MyClass2(2, myClass1.Id+ " did that"),
new MyClass2(3, myClass1.Id+ " did this again")
};
}
}
public class MyClass1
{
public MyClass1(int id, string value)
{
Id = id;
Value = value;
}
public int Id { get; set; }
public string Value { get; set; }
}
public class MyClass2
{
public MyClass1(int id, string value)
{
Id = id;
Value = value;
}
public int Id { get; set; }
public string Value { get; set; }
}
所以我纠结的重点是结束,我需要调用 Complete()
或等待 Completion
.我似乎无法找到正确的组合.我想看到如下输出:
So the point I struggle with is the end, where I'd need to call Complete()
or wait for Completion
. I can't seem to find the right combination. I'd like to see an output as follows:
TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)
[the rest of the items]
Press any key to exit...
谁能给我指出正确的方向?
Anyone can point me to the right direction?
推荐答案
大功告成,您需要在管道的第一个块上调用 Complete
然后 await
> Completion
在最后一个块上.然后在您的链接中,您需要像这样传播完成:
You're almost there, you need to call Complete
on the first block in the pipeline then await
Completion
on the last block. Then in your links you need to propagate completion like this:
private async static void Main(string[] args) {
var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
{
Console.WriteLine($"TransformBlock(id: {id})");
return new MyClass1(id, "Star Wars");
});
var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
{
Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
Thread.Sleep(1000);
return GetMyClass22Values(myClass1);
});
var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
{
Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
Thread.Sleep(1000);
});
//propagate completion
transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true});
foreach(var id in ids) {
transformBlock.Post(id);
}
//Complete the first block
transformBlock.Complete();
//wait for completion to flow to the last block
await actionBlock.Completion;
}
您还可以将批处理块合并到您的管道中,并消除对 TryRecieve
调用的需要,但这似乎是您流程的另一部分.
You can also incorporate the batch block into your pipeline and remove the need for the TryRecieve
call but that seems like another part of your flow.
编辑
将完成传播到多个块的示例:
Example of propagating completion to multiple blocks:
public async static void Main(string[] args) {
var sourceBlock = new BufferBlock<int>();
var processBlock1 = new ActionBlock<int>(i => Console.WriteLine($"Block1 {i}"));
var processBlock2 = new ActionBlock<int>(i => Console.WriteLine($"Block2 {i}"));
sourceBlock.LinkTo(processBlock1);
sourceBlock.LinkTo(processBlock2);
var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => {
if(!tsk.IsFaulted) {
processBlock1.Complete();
processBlock2.Complete();
} else {
((IDataflowBlock)processBlock1).Fault(tsk.Exception);
((IDataflowBlock)processBlock2).Fault(tsk.Exception);
}
});
//Send some data...
sourceBlock.Complete();
await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion);
}
这篇关于TPL 完成 vs 完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!