为什么任务不能并行执行? [英] Why tasks are not going in parallel?

查看:82
本文介绍了为什么任务不能并行执行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个运行测试的位置(称为单元).测试被实现为异步任务并因此运行.用户可以选择为每个单元运行任何测试.如果我选择在所有单元上运行完全相同的测试,那么它将或多或少并行进行.

具有测试A, B, C,如果在单元1和2上我选择测试A, B,而在3上我仅选择C,则出于某种原因,单元1和2中的测试将开始运行,但是在单元3中进行测试C将不会开始,直到在单元格1和2中的A和B测试不会完成为止.基本上,所有单元中的所有测试都倾向于以相同的顺序运行.那不是我想要的.我试图实现的是独立于每个单元运行的测试链.现在,我将展示如何实现.

private async void buttonStartTest_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(Tests.Prepare), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    var blockFinalize = CreateExceptionCatchingActionBlock(new Func<Cell, Task>(Tests.Finalize), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    List<IPropagatorBlock<Cell, Cell>> blockList = new List<IPropagatorBlock<Cell, Cell>>();
    var funcs = tests.Select(x => x.Value);
    foreach (var func in funcs)
    {
        var blockNew = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(func), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 10000,
            MaxDegreeOfParallelism = 40,
        });
        blockList.Add(blockNew);
    }

    // link
    for (int i = 0; i < blockList.Count - 1; i++)
    {
        var b1 = blockList[i];
        var b2 = blockList[i + 1];
        b1.LinkTo(b2);
    }

    // link first and last
    blockPrepare.LinkTo(blockList[0], new DataflowLinkOptions { PropagateCompletion = true });
    blockList[blockList.Count - 1].LinkTo(blockFinalize, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        c.Reset();
        c.State = Cell.States.InProgress;
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockFinalize.Completion;
    }
    catch (Exception ex)
    {
        logger.Debug(ex.InnerException.InnerException.Message);
    }
}

上面,您可以看到每个单元格有2个必需的块-准备并完成.这是我创建它们的方式:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
                Func<TInput, Task<TOutput>> transform,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}

public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                Func<TInput, Task> action,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new ActionBlock<TInput>(async input =>
    {
        try
        {
            await action(input);
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));
        }
    }, dataflowBlockOptions);
}

测试本身如下所示:

public static async Task<Cell> TestDoorsAsync(Cell c)
{
    int thisTestID = TEST_DOORS;
    TestConfiguration conf = c.GetConfiguration(thisTestID);
    if (conf.Enabled)
    {
       ... // execute test
    }
    else
    {
       // report that test was skipped due to user configuration
    }

    return c;
}

那么我是否错过了一些选择,或者软件设计错误,从而阻止了单元中的测试运行而无需等待其他单元中的测试完成?

更新

repo 是证明问题的最小控制台应用程序.

仍然有3个单元格和3个测试(任务).在单元1、2上,我选择运行所有测试,而在单元3上仅选择测试3.我期望在单元3的准备任务完成后立即看到跳过的测试1、2和运行测试3.

我看到的是(#-手机号码)

#1 Preparing...
#2 Preparing...
#3 Preparing...

#1 Test1 running...
#2 Test1 running...
#3 Test1 skipped
#1 Test2 running...
#2 Test2 running...
#3 Test2 skipped
#1 Test3 running...
#2 Test3 running...
#3 Test3 running...

#2 Finalizing...
#1 Finalizing...
#3 Finalizing...

第3单元中的测试与第1单元和第2单元中的测试同步.所有测试同时完成,而在第3单元中,单个测试应比其他单元中的测试更早完成.

解决方案

感谢您的编辑.添加EnsureOrdered = false阻止选项.发生的情况是您的TransfomrBlocks直到所有单元都完成处理后才传递单元格,因此它们可以维持您的订单.这是默认设置,通常是可取的,但不适用于您的情况.

当我评论它们在当前代码中没有错时,看起来我是错的.

I have number of locations(called Cells) where I run tests. Tests are implemented as asynchronous tasks and running consequently. User can select to run any tests for each cell. If I select to run same exactly same tests on all cells, then it's going more or less parallel.

Having tests A, B, C, if on cell 1 and 2 I select test A, B and on 3 I select only C, then for some reason tests in cell 1 and 2 will start running, but in cell 3 test C will not start, until A and B tests in cell 1 and 2 will not finished. Basically all tests in all cells are tend to run in the same order. That is not what I wanted. What I tried to achieve is the chain of tests to run independently from each cell. Now I will show how I implemented.

private async void buttonStartTest_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(Tests.Prepare), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    var blockFinalize = CreateExceptionCatchingActionBlock(new Func<Cell, Task>(Tests.Finalize), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    List<IPropagatorBlock<Cell, Cell>> blockList = new List<IPropagatorBlock<Cell, Cell>>();
    var funcs = tests.Select(x => x.Value);
    foreach (var func in funcs)
    {
        var blockNew = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(func), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 10000,
            MaxDegreeOfParallelism = 40,
        });
        blockList.Add(blockNew);
    }

    // link
    for (int i = 0; i < blockList.Count - 1; i++)
    {
        var b1 = blockList[i];
        var b2 = blockList[i + 1];
        b1.LinkTo(b2);
    }

    // link first and last
    blockPrepare.LinkTo(blockList[0], new DataflowLinkOptions { PropagateCompletion = true });
    blockList[blockList.Count - 1].LinkTo(blockFinalize, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        c.Reset();
        c.State = Cell.States.InProgress;
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockFinalize.Completion;
    }
    catch (Exception ex)
    {
        logger.Debug(ex.InnerException.InnerException.Message);
    }
}

Above you can see 2 mandatory blocks for each cell - prepare and finalize. Here is how I create them:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
                Func<TInput, Task<TOutput>> transform,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}

public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                Func<TInput, Task> action,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new ActionBlock<TInput>(async input =>
    {
        try
        {
            await action(input);
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));
        }
    }, dataflowBlockOptions);
}

Test itself look like this:

public static async Task<Cell> TestDoorsAsync(Cell c)
{
    int thisTestID = TEST_DOORS;
    TestConfiguration conf = c.GetConfiguration(thisTestID);
    if (conf.Enabled)
    {
       ... // execute test
    }
    else
    {
       // report that test was skipped due to user configuration
    }

    return c;
}

So is there some option that I missed or software design is wrong, which is preventing tests in cells to run without waiting for the tests to complete in other cells?

UPDATE

The repo is minimal console app demonstrating the issue.

There are still 3 cells and 3 tests(tasks). On cell 1, 2 I am selecting to run all tests, while on cell 3 only test 3. What I expect is right after preparation task for cell 3, to immediately see skipped tests 1, 2 and running test 3.

What I see is (# - cell number)

#1 Preparing...
#2 Preparing...
#3 Preparing...

#1 Test1 running...
#2 Test1 running...
#3 Test1 skipped
#1 Test2 running...
#2 Test2 running...
#3 Test2 skipped
#1 Test3 running...
#2 Test3 running...
#3 Test3 running...

#2 Finalizing...
#1 Finalizing...
#3 Finalizing...

tests in cell 3 synchronized with tests in cell 1 and 2. All tests finished at the same time, while in cell 3 the single test should have been finished earlier than in other cells.

解决方案

Thanks for the edit. Add EnsureOrdered = false to block options. What's happening is that your TransfomrBlocks are not passing the cells along until they are all done processing, so they can maintain your order. This is default and usually preferable but not in your case.

Looks like I was wrong when I commented that their was nothing wrong in the current code.

这篇关于为什么任务不能并行执行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆