TPL完成与完成 [英] TPL Complete vs Completion

查看:70
本文介绍了TPL完成与完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要从旧版数据库导入与客户相关的数据,并在此过程中执行一些转换.这意味着单个条目需要执行其他事件"(同步产品,创建发​​票等).

I need to import customer related data from legacy DB and perform several transformations during the process. This means a single entry needs to perform additional "events" (synchronize products, create invoices, etc.).

我最初的解决方案是一种简单的并行方法. 可以,但是有时会出现问题.如果当前处理的客户需要等待相同类型的事件,则他们的处理队列可能会卡住并最终超时,从而导致每个基础事件也失败(它们取决于失败的事件).它并不会一直发生,但很烦人.

My initial solution was a simple parallel approach. It works okay, but sometimes it has issues. If the currently processed customers need to wait for the same type of events, their processing queues might got stuck and eventually time out, causing every underlying events to fail too (they depend on the one which failed). It doesn't happen all the time, yet it's annoying.

所以我有了另一个想法,分批工作.我的意思是不仅要限制同时处理的客户数量,还要限制广播到队列的事件数量.在寻找想法时,我发现了答案,它指向 TPL DataFlow .

So I got another idea, work in batches. I mean not only limiting the number of customers being processed at the same time, but also the number of the events which are broadcasted to the queues. While searching around for ideas, I found this answer, which points to the TPL DataFlow.

我做了一个骨架来熟悉它.我建立了一个简单的管道,但是对Complete()的使用和等待Completion()的使用感到有些困惑.

I made a skeleton to get familiar with it. I set up a simple pipeline, but I'm a bit confused about the usage of Complete() and awaiting Completion().

步骤如下

  1. 列出一个数字列表(要导入的客户的ID)-这在导入逻辑之外,它就可以触发其余的逻辑
  2. 创建BatchBlock(以便限制同时处理的客户数量)
  3. 根据ID(TransformBlock<int, MyClass1>)创建一个MyClass1项目
  4. 执行一些逻辑并生成MyClass2(TransformManyBlock<MyClass1, MyClass2>)的集合-例如,睡眠1秒
  5. 对集合(ActionBlock<MyClass2>)的每个项目执行一些逻辑-例如,睡眠1秒
  1. Make a list of numbers (the ids of the customers to be imported) - this is outside the import logic, it just there to be able to trigger the rest of the logic
  2. Create a BatchBlock (to be able to limit the number of customers to be processed at the same time)
  3. Create a single MyClass1 item based on the id (TransformBlock<int, MyClass1>)
  4. Perform some logic and generate a collection of MyClass2 (TransformManyBlock<MyClass1, MyClass2>) - as example, sleep for 1 second
  5. Perform some logic on every item of the collection (ActionBlock<MyClass2>) - as example, sleep for 1 second

这是完整的代码:

public static class Program
{
    private static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(2);
        for (var i = 1; i < 10; i++)
        {
            batchBlock.Post(i);
        }


        batchBlock.Complete();
        while (batchBlock.TryReceive(null, out var ids))
        {
            var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
            {
                Console.WriteLine($"TransformBlock(id: {id})");
                return new MyClass1(id, "Star Wars");
            });
            var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
            {
                Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
                Thread.Sleep(1000);
                return GetMyClass22Values(myClass1);
            });

            var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
            {
                Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
                Thread.Sleep(1000);
            });
            transformBlock.LinkTo(transformManyBlock);
            transformManyBlock.LinkTo(actionBlock);
            foreach (var id in ids)
            {
                transformBlock.Post(id);
            }

            // this is the point when I'm not 100% sure

            //transformBlock.Complete();
            //transformManyBlock.Complete();
            //transformManyBlock.Completion.Wait();
            actionBlock.Complete();
            actionBlock.Completion.Wait();
        }

        Console.WriteLine();
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
    {
        return new List<MyClass2>
               {
                   new MyClass2(1, myClass1.Id+ " did this"),
                   new MyClass2(2, myClass1.Id+ " did that"),
                   new MyClass2(3, myClass1.Id+ " did this again")
               };
    }
}

public class MyClass1
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

public class MyClass2
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

所以我要努力解决的是终点,我需要打电话给Complete()或等待Completion.我似乎找不到合适的组合.我希望看到如下输出:

So the point I struggle with is the end, where I'd need to call Complete() or wait for Completion. I can't seem to find the right combination. I'd like to see an output as follows:

TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)

[the rest of the items]


Press any key to exit...   

任何人都可以指出我正确的方向吗?

Anyone can point me to the right direction?

推荐答案

您快到了,您需要在管道的第一个块上调用Complete,然后在最后一个块上调用await Completion.然后,在您的链接中,您需要像这样传播完成:

You're almost there, you need to call Complete on the first block in the pipeline then await Completion on the last block. Then in your links you need to propagate completion like this:

private async static void Main(string[] args) {
    var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
    {
        Console.WriteLine($"TransformBlock(id: {id})");
        return new MyClass1(id, "Star Wars");
    });
    var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
    {
        Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
        Thread.Sleep(1000);
        return GetMyClass22Values(myClass1);
    });

    var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
    {
        Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
        Thread.Sleep(1000);
    });

    //propagate completion
    transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true});
    foreach(var id in ids) {
        transformBlock.Post(id);
    }


    //Complete the first block
    transformBlock.Complete();

    //wait for completion to flow to the last block
    await actionBlock.Completion;
} 

您还可以将批处理块合并到管道中,并消除对TryRecieve调用的需求,但这似乎是流程的另一部分.

You can also incorporate the batch block into your pipeline and remove the need for the TryRecieve call but that seems like another part of your flow.

修改

将完成传播到多个块的示例:

Example of propagating completion to multiple blocks:

public async static void Main(string[] args) {

    var sourceBlock = new BufferBlock<int>();

    var processBlock1 = new ActionBlock<int>(i => Console.WriteLine($"Block1 {i}"));

    var processBlock2 = new ActionBlock<int>(i => Console.WriteLine($"Block2 {i}"));

    sourceBlock.LinkTo(processBlock1);
    sourceBlock.LinkTo(processBlock2);

    var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => {
        if(!tsk.IsFaulted) {
            processBlock1.Complete();
            processBlock2.Complete();
        } else {
            ((IDataflowBlock)processBlock1).Fault(tsk.Exception);
            ((IDataflowBlock)processBlock2).Fault(tsk.Exception);
        }
    });

    //Send some data...

    sourceBlock.Complete();
    await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion);
}

这篇关于TPL完成与完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆