TPL 完成 vs 完成 [英] TPL Complete vs Completion

查看:27
本文介绍了TPL 完成 vs 完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要从旧数据库中导入与客户相关的数据,并在此过程中执行多项转换.这意味着单个条目需要执行额外的事件"(同步产品、创建发票等).

I need to import customer related data from legacy DB and perform several transformations during the process. This means a single entry needs to perform additional "events" (synchronize products, create invoices, etc.).

我最初的解决方案是一个简单的并行方法.它工作正常,但有时它有问题.如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个底层事件也失败(它们取决于失败的事件).它不会一直发生,但很烦人.

My initial solution was a simple parallel approach. It works okay, but sometimes it has issues. If the currently processed customers need to wait for the same type of events, their processing queues might got stuck and eventually time out, causing every underlying events to fail too (they depend on the one which failed). It doesn't happen all the time, yet it's annoying.

所以我有了另一个想法,分批工作.我的意思是不仅要限制同时处理的客户数量,还要限制广播到队列的事件数量.在四处寻找想法时,我发现了 this 答案,它指向 TPL 数据流.

So I got another idea, work in batches. I mean not only limiting the number of customers being processed at the same time, but also the number of the events which are broadcasted to the queues. While searching around for ideas, I found this answer, which points to the TPL DataFlow.

我做了一个骨架来熟悉它.我设置了一个简单的管道,但我对 Complete() 的用法和等待 Completion() 的用法有点困惑.

I made a skeleton to get familiar with it. I set up a simple pipeline, but I'm a bit confused about the usage of Complete() and awaiting Completion().

步骤如下

  1. 制作一个数字列表(要导入的客户的 ID) - 这在导入逻辑之外,它只是为了能够触发其余的逻辑
  2. 创建一个BatchBlock(能够限制同时处理的客户数量)
  3. 根据 id 创建单个 MyClass1 项目 (TransformBlock)
  4. 执行一些逻辑并生成MyClass2(TransformManyBlock)的集合——例如,睡眠1秒
  5. 对集合的每个项目执行一些逻辑 (ActionBlock) - 例如,睡眠 1 秒
  1. Make a list of numbers (the ids of the customers to be imported) - this is outside the import logic, it just there to be able to trigger the rest of the logic
  2. Create a BatchBlock (to be able to limit the number of customers to be processed at the same time)
  3. Create a single MyClass1 item based on the id (TransformBlock<int, MyClass1>)
  4. Perform some logic and generate a collection of MyClass2 (TransformManyBlock<MyClass1, MyClass2>) - as example, sleep for 1 second
  5. Perform some logic on every item of the collection (ActionBlock<MyClass2>) - as example, sleep for 1 second

完整代码如下:

public static class Program
{
    private static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(2);
        for (var i = 1; i < 10; i++)
        {
            batchBlock.Post(i);
        }


        batchBlock.Complete();
        while (batchBlock.TryReceive(null, out var ids))
        {
            var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
            {
                Console.WriteLine($"TransformBlock(id: {id})");
                return new MyClass1(id, "Star Wars");
            });
            var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
            {
                Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
                Thread.Sleep(1000);
                return GetMyClass22Values(myClass1);
            });

            var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
            {
                Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
                Thread.Sleep(1000);
            });
            transformBlock.LinkTo(transformManyBlock);
            transformManyBlock.LinkTo(actionBlock);
            foreach (var id in ids)
            {
                transformBlock.Post(id);
            }

            // this is the point when I'm not 100% sure

            //transformBlock.Complete();
            //transformManyBlock.Complete();
            //transformManyBlock.Completion.Wait();
            actionBlock.Complete();
            actionBlock.Completion.Wait();
        }

        Console.WriteLine();
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
    {
        return new List<MyClass2>
               {
                   new MyClass2(1, myClass1.Id+ " did this"),
                   new MyClass2(2, myClass1.Id+ " did that"),
                   new MyClass2(3, myClass1.Id+ " did this again")
               };
    }
}

public class MyClass1
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

public class MyClass2
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

所以我纠结的重点是结束,我需要调用 Complete() 或等待 Completion.我似乎无法找到正确的组合.我想看到如下输出:

So the point I struggle with is the end, where I'd need to call Complete() or wait for Completion. I can't seem to find the right combination. I'd like to see an output as follows:

TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)

[the rest of the items]


Press any key to exit...   

谁能给我指出正确的方向?

Anyone can point me to the right direction?

推荐答案

大功告成,您需要在管道的第一个块上调用 Complete 然后 await> Completion 在最后一个块上.然后在您的链接中,您需要像这样传播完成:

You're almost there, you need to call Complete on the first block in the pipeline then await Completion on the last block. Then in your links you need to propagate completion like this:

private async static void Main(string[] args) {
    var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
    {
        Console.WriteLine($"TransformBlock(id: {id})");
        return new MyClass1(id, "Star Wars");
    });
    var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
    {
        Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
        Thread.Sleep(1000);
        return GetMyClass22Values(myClass1);
    });

    var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
    {
        Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
        Thread.Sleep(1000);
    });

    //propagate completion
    transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true});
    foreach(var id in ids) {
        transformBlock.Post(id);
    }


    //Complete the first block
    transformBlock.Complete();

    //wait for completion to flow to the last block
    await actionBlock.Completion;
} 

您还可以将批处理块合并到您的管道中,并消除对 TryRecieve 调用的需要,但这似乎是您流程的另一部分.

You can also incorporate the batch block into your pipeline and remove the need for the TryRecieve call but that seems like another part of your flow.

编辑

将完成传播到多个块的示例:

Example of propagating completion to multiple blocks:

public async static void Main(string[] args) {

    var sourceBlock = new BufferBlock<int>();

    var processBlock1 = new ActionBlock<int>(i => Console.WriteLine($"Block1 {i}"));

    var processBlock2 = new ActionBlock<int>(i => Console.WriteLine($"Block2 {i}"));

    sourceBlock.LinkTo(processBlock1);
    sourceBlock.LinkTo(processBlock2);

    var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => {
        if(!tsk.IsFaulted) {
            processBlock1.Complete();
            processBlock2.Complete();
        } else {
            ((IDataflowBlock)processBlock1).Fault(tsk.Exception);
            ((IDataflowBlock)processBlock2).Fault(tsk.Exception);
        }
    });

    //Send some data...

    sourceBlock.Complete();
    await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion);
}

这篇关于TPL 完成 vs 完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆