动作中的C#异步 [英] C# async within an action

查看:66
本文介绍了动作中的C#异步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想编写一个接受几个参数的方法,包括一个动作和一个重试量并调用它。

I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.

所以我有这段代码:

public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
    {
        object lockObj = new object();
        int index = 0;

        return new Action(async () =>
        {
            while (true)
            {
                T item;
                lock (lockObj)
                {
                    if (index < source.Count)
                    {
                        item = source[index];
                        index++;
                    }
                    else
                        break;
                }

                int retry = retries;
                while (retry > 0)
                {
                    try
                    {
                        bool res = await action(item);
                        if (res)
                            retry = -1;
                        else
                            //sleep if not success..
                            Thread.Sleep(200);

                    }
                    catch (Exception e)
                    {
                        LoggerAgent.LogException(e, method);
                    }
                    finally
                    {
                        retry--;
                    }
                }
            }
        }).RunParallel(threads);
    }

RunParallel是Action的扩展方法,其外观如下:

RunParallel is an extention method for Action, its look like this:

public static IEnumerable<Task> RunParallel(this Action action, int amount)
    {
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < amount; i++)
        {
            Task task = Task.Factory.StartNew(action);
            tasks.Add(task);
        }
        return tasks;
    }

现在,问题是:线程只是消失或崩溃而没有等待操作完成。

Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.

我编写了以下示例代码:

I wrote this example code:

private static async Task ex()
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < 1000; i++)
        {
            ints.Add(i);
        }

        var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) =>
        {
            try
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + num + "  " + test[0]);
                return test[0] == "test";
            }
            catch (Exception e)
            {
                Console.WriteLine(e.StackTrace);
                return false;
            }

        }, 5, "test");

        await Task.WhenAll(tasks);
    }

fetchSmthFromDb

只要 List< string> test =等待fetchSmthFromDb(); 行被调用,线程似乎正在关闭并且 Console.WriteLine(# + num + + test [0]) ; 甚至不会被触发,甚至在调试断点时也不会命中。

Whenever the List<string> test = await fetchSmthFromDb(); row is invoked, the thread seems to be closing and the Console.WriteLine("#" + num + " " + test[0]); not even being triggered, also when debugging the breakpoint never hit.

最终工作代码

private static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
    {
        while (true)
        {
            try
            {
                await action();
                break;
            }
            catch (Exception e)
            {
                LoggerAgent.LogException(e, method);
            }

            if (retryCount <= 0)
                break;

            retryCount--;
            await Task.Delay(200);
        };
    }

    public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
    {
        Func<T, Task> newAction = async (item) =>
        {
            await DoWithRetries(async ()=>
            {
                await action(item);
            }, retries, method);
        };
        await source.ParallelForEachAsync(newAction, threads);
    }


推荐答案

问题出在这一行:

return new Action(async () => ...

您可以使用异步lambda启动异步操作,但不要返回等待任务,即它在工作线程上运行,但您永远不会找出完成的时间,然后在异步操作完成之前终止程序-这就是为什么您看不到任何输出的原因。

You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.

它必须为:

return new Func<Task>(async () => ...






更新

首先,您需要拆分方法的职责,因此您不要将重试策略(不应硬编码为布尔结果的检查)与并行运行的任务混合使用。

First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.

然后,如前所述,您将 while(true)循环运行100次,而不是并行处理。

Then, as previously mentioned, you run your while (true) loop 100 times instead of doing things in parallel.

正如@MachineLearning指出的那样,使用 Task.Delay 代替 Thread.Sleep

As @MachineLearning pointed out, use Task.Delay instead of Thread.Sleep.

总体而言,您的解决方案如下:

Overall, your solution looks like this:

using System.Collections.Async;

static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
    while (true)
    {
        try
        {
            await action();
            break;
        }
        catch (Exception e)
        {
            LoggerAgent.LogException(e, method);
        }

        if (retryCount <= 0)
            break;

        retryCount--;
        await Task.Delay(millisecondsDelay: 200);
    };
}

static async Task Example()
{
    List<int> ints = new List<int>();
    for (int i = 0; i < 1000; i++)
        ints.Add(i);

    Func<int, Task> actionOnItem =
        async item =>
        {
            await DoWithRetries(async () =>
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + item + "  " + test[0]);
                if (test[0] != "test")
                    throw new InvalidOperationException("unexpected result"); // will be re-tried
            },
            retryCount: 5,
            method: "test");
        };

    await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}

您需要使用 AsyncEnumerator NuGet程序包,以便使用 ParallelForEachAsync 扩展方法。 c $ c> System.Collections.Async 命名空间。

You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync extension method from the System.Collections.Async namespace.

这篇关于动作中的C#异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆