动作中的C#异步 [英] C# async within an action
问题描述
我想编写一个接受几个参数的方法,包括一个动作和一个重试量并调用它。
I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.
所以我有这段代码:
public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
{
object lockObj = new object();
int index = 0;
return new Action(async () =>
{
while (true)
{
T item;
lock (lockObj)
{
if (index < source.Count)
{
item = source[index];
index++;
}
else
break;
}
int retry = retries;
while (retry > 0)
{
try
{
bool res = await action(item);
if (res)
retry = -1;
else
//sleep if not success..
Thread.Sleep(200);
}
catch (Exception e)
{
LoggerAgent.LogException(e, method);
}
finally
{
retry--;
}
}
}
}).RunParallel(threads);
}
RunParallel是Action的扩展方法,其外观如下:
RunParallel is an extention method for Action, its look like this:
public static IEnumerable<Task> RunParallel(this Action action, int amount)
{
List<Task> tasks = new List<Task>();
for (int i = 0; i < amount; i++)
{
Task task = Task.Factory.StartNew(action);
tasks.Add(task);
}
return tasks;
}
现在,问题是:线程只是消失或崩溃而没有等待操作完成。
Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.
我编写了以下示例代码:
I wrote this example code:
private static async Task ex()
{
List<int> ints = new List<int>();
for (int i = 0; i < 1000; i++)
{
ints.Add(i);
}
var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) =>
{
try
{
List<string> test = await fetchSmthFromDb();
Console.WriteLine("#" + num + " " + test[0]);
return test[0] == "test";
}
catch (Exception e)
{
Console.WriteLine(e.StackTrace);
return false;
}
}, 5, "test");
await Task.WhenAll(tasks);
}
fetchSmthFromDb
为
只要 List< string> test =等待fetchSmthFromDb();
行被调用,线程似乎正在关闭并且 Console.WriteLine(# + num + + test [0]) ;
甚至不会被触发,甚至在调试断点时也不会命中。
Whenever the List<string> test = await fetchSmthFromDb();
row is invoked, the thread seems to be closing and the Console.WriteLine("#" + num + " " + test[0]);
not even being triggered, also when debugging the breakpoint never hit.
最终工作代码
private static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
while (true)
{
try
{
await action();
break;
}
catch (Exception e)
{
LoggerAgent.LogException(e, method);
}
if (retryCount <= 0)
break;
retryCount--;
await Task.Delay(200);
};
}
public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
{
Func<T, Task> newAction = async (item) =>
{
await DoWithRetries(async ()=>
{
await action(item);
}, retries, method);
};
await source.ParallelForEachAsync(newAction, threads);
}
推荐答案
问题出在这一行:
return new Action(async () => ...
您可以使用异步lambda启动异步操作,但不要返回等待任务,即它在工作线程上运行,但您永远不会找出完成的时间,然后在异步操作完成之前终止程序-这就是为什么您看不到任何输出的原因。
You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.
它必须为:
return new Func<Task>(async () => ...
更新
首先,您需要拆分方法的职责,因此您不要将重试策略(不应硬编码为布尔结果的检查)与并行运行的任务混合使用。
First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.
然后,如前所述,您将 while(true)
循环运行100次,而不是并行处理。
Then, as previously mentioned, you run your while (true)
loop 100 times instead of doing things in parallel.
正如@MachineLearning指出的那样,使用 Task.Delay
代替 Thread.Sleep
。
As @MachineLearning pointed out, use Task.Delay
instead of Thread.Sleep
.
总体而言,您的解决方案如下:
Overall, your solution looks like this:
using System.Collections.Async;
static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
while (true)
{
try
{
await action();
break;
}
catch (Exception e)
{
LoggerAgent.LogException(e, method);
}
if (retryCount <= 0)
break;
retryCount--;
await Task.Delay(millisecondsDelay: 200);
};
}
static async Task Example()
{
List<int> ints = new List<int>();
for (int i = 0; i < 1000; i++)
ints.Add(i);
Func<int, Task> actionOnItem =
async item =>
{
await DoWithRetries(async () =>
{
List<string> test = await fetchSmthFromDb();
Console.WriteLine("#" + item + " " + test[0]);
if (test[0] != "test")
throw new InvalidOperationException("unexpected result"); // will be re-tried
},
retryCount: 5,
method: "test");
};
await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}
您需要使用 AsyncEnumerator NuGet程序包,以便使用 ParallelForEachAsync
扩展方法。 c $ c> System.Collections.Async 命名空间。
You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync
extension method from the System.Collections.Async
namespace.
这篇关于动作中的C#异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!