如何顺序+并行实现和运行任务集合? [英] How to implement and run the collection of tasks sequentially + in parallel?
问题描述
我有一个 Tasks
列表,其中每个元素代表 async Task
方法调用.当我构建这个列表时 - 我决定这个 Task
是需要在前一个任务完成后运行还是与它并行运行(基于配置设置).然而,这并没有像预期的那样工作 - ContinueWith
中的部分过着自己的生活,无论我使用什么选项(请参阅下面的源代码).
日志 - 现在我设置说任务需要按顺序运行(IsSynchronousStart == true
):
2021-02-05 13:20:53.616 +02:00 [INF] BatchMasterModule,InitAsync 开始
<块引用>
2021-02-05 13:20:55.258 +02:00 [INF] SqlBatchJobModule,RunAsync 开始
<块引用>
2021-02-05 13:22:11.711 +02:00 [INF] SqlBatchJobModule,RunAsync 结束
<块引用>
2021-02-05 13:22:11.713 +02:00 [INF] FileBatchJobModule,RunAsync 开始
<块引用>
2021-02-05 13:22:11.713 +02:00 [INF] 批处理任务状态为 RanToCompletion
<块引用>
2021-02-05 13:22:14.644 +02:00 [INF] BatchMasterModule,InitAsync 开始
<块引用>
2021-02-05 13:22:16.224 +02:00 [INF] SqlBatchJobModule,RunAsync 开始
<块引用>
2021-02-05 13:22:20.638 +02:00 [INF] FileBatchJobModule,RunAsync 结束 (!!!)
这里有两个任务:batchJobTasks[0]
匹配 SqlBatchJobModule
,batchJobTasks[1]
匹配 FileBatchJobModule
.
FileBatchJobModule,RunAsync 已启动"- 这必须在SqlBatchJobModule,RunAsync 结束"之后调用;(有时会按预期运行,有时 - 在批处理任务状态为 RanToCompletion"之后)
FileBatchJobModule,RunAsync 结束";- 这必须在批处理任务状态为 RanToCompletion"之前调用;(现在绝对不行)
BatchMasterModule,InitAsync 启动";- 这表示及时调用相同的方法
更新
InitAsync
的源代码(缩减——保留了与Task
列表创建相关的部分):公共类 BatchMasterModule : IBatchMasterModule{公共异步任务 InitAsync([DisableConcurrentExecutionKeyAttribute] FullBatchKeyDto batchKey){...var batchJobs = await _batchJobAppService.GetListForServerAsync(batchKey);...var batchJobTasks = new List
();BatchJobDto prevJob = null;foreach(batchJobs 中的 var batchJob){if (_batchJobModuleFactory.TryGet(batchJob.Module?.Runtime, out IBatchJobModule 模块)){...var context = new BatchJobContext(_服务提供者,batchKey.TenantId,批处理作业);...if (prevJob == null || prevJob.IsSynchronousStart.Value == false){batchJobTasks.Add(module.RunAsync(context));}别的{var previousTask = batchJobTasks.Last();previousTask = previousTask.ContinueWith(result => module.RunAsync(context));}prevJob = 批处理作业;}别的{throw new ArgumentNullException(nameof(batchJob.Module.Runtime));}}等待任务.WhenAll(batchJobTasks).ContinueWith(result =>{if (batch.PrintReport.HasValue && batch.PrintReport.Value == true){//TODO:只有在成功执行作业时才执行报告?Log.Information($"Batch tasks status is {result.Status}. OK, will print report at {DateTime.Now}");}});}} FileBatchJobModule
- 未按预期调用 - 而不是在前一个模块 -SqlBatchModule
- 完成时调用,而是在某些随机时刻调用,并且在WhenAll
完成之后完成,而不是之前(正如我所期望的那样):公共类 FileBatchJobModule : IBatchJobModule{public Func
OnCompletedAsync { 获取;放;}公共异步任务 RunAsync(IBatchJobContext 上下文){//去做Log.Information($"FileBatchJobModule, RunAsync 开始于 {DateTime.Now}");const int defaultDelay = 15000;等待任务.延迟(context.Parameters.TryGetValue("delay", out string delayString)?(int.TryParse(delayString, out int delay) ? delay : defaultDelay):默认延迟);byte[] encodingText = Encoding.Unicode.GetBytes($"在{DateTime.Now}{Environment.NewLine}处更新来自异步调用的文本");使用 (var sourceStream = new FileStream(@C:\CentralTools\Files\filebatchjobmodule.txt",FileMode.Append,FileAccess.Write,文件共享.无,缓冲区大小:4096,useAsync: 真)){等待 sourceStream.WriteAsync(encodedText, 0, encodingText.Length);};如果(OnCompletedAsync != null){等待 OnCompletedAsync.Invoke(context);}Log.Information($"FileBatchJobModule, RunAsync 于 {DateTime.Now} 结束");}}
我通过从 List 切换到 List
if (prevJob == null || prevJob.IsSynchronousStart.Value == false){batchJobTasks.Add(async () => await module.RunAsync(context));}别的{var previousTask = batchJobTasks.Last();batchJobTasks.RemoveAt(batchJobTasks.Count - 1);batchJobTasks.Add(async () => { await previousTask(); await module.RunAsync(context); });}
I have a list of Tasks
, where each element represents async Task
method call. When I construct this list - I make decision on whether this Task
needs to be run after the previous one has completed or in parallel with it (based on configuration settings). However, this does not work as expected - the part from ContinueWith
lives its own life, no matter what options I use for it (please see piece of source code below).
Log - now I have setting saying the tasks need to be run sequentially (IsSynchronousStart == true
):
2021-02-05 13:20:53.616 +02:00 [INF] BatchMasterModule, InitAsync started
2021-02-05 13:20:55.258 +02:00 [INF] SqlBatchJobModule, RunAsync started
2021-02-05 13:22:11.711 +02:00 [INF] SqlBatchJobModule, RunAsync ended
2021-02-05 13:22:11.713 +02:00 [INF] FileBatchJobModule, RunAsync started
2021-02-05 13:22:11.713 +02:00 [INF] Batch tasks status is RanToCompletion
2021-02-05 13:22:14.644 +02:00 [INF] BatchMasterModule, InitAsync started
2021-02-05 13:22:16.224 +02:00 [INF] SqlBatchJobModule, RunAsync started
2021-02-05 13:22:20.638 +02:00 [INF] FileBatchJobModule, RunAsync ended (!!!)
There are two tasks here:
batchJobTasks[0]
matches SqlBatchJobModule
, batchJobTasks[1]
matches FileBatchJobModule
.
"FileBatchJobModule, RunAsync started" - this had to be called after "SqlBatchJobModule, RunAsync ended" (this is SOMETIMES run as expected, sometimes - after "Batch tasks status is RanToCompletion")
"FileBatchJobModule, RunAsync ended" - this had to be called before "Batch tasks status is RanToCompletion" (is NEVER OK now)
"BatchMasterModule, InitAsync started" - this represents a scheduled invocation of the same method in time
UPDATE
the source code of
InitAsync
(reduced - the part related toTask
list creation retained):public class BatchMasterModule : IBatchMasterModule { public async Task InitAsync([DisableConcurrentExecutionKeyAttribute] FullBatchKeyDto batchKey) { ... var batchJobs = await _batchJobAppService.GetListForServerAsync(batchKey); ... var batchJobTasks = new List<Task>(); BatchJobDto prevJob = null; foreach (var batchJob in batchJobs) { if (_batchJobModuleFactory.TryGet(batchJob.Module?.Runtime, out IBatchJobModule module)) { ... var context = new BatchJobContext ( _serviceProvider, batchKey.TenantId, batchJob ); ... if (prevJob == null || prevJob.IsSynchronousStart.Value == false) { batchJobTasks.Add(module.RunAsync(context)); } else { var previousTask = batchJobTasks.Last(); previousTask = previousTask.ContinueWith(result => module.RunAsync(context)); } prevJob = batchJob; } else { throw new ArgumentNullException(nameof(batchJob.Module.Runtime)); } } await Task .WhenAll(batchJobTasks) .ContinueWith(result => { if (batch.PrintReport.HasValue && batch.PrintReport.Value == true) { //TODO: perform reporting only if successful job execution? Log.Information($"Batch tasks status is {result.Status}. OK, will print report at {DateTime.Now}"); } }); } }
FileBatchJobModule
- which is not called as expected - instead of being called when the previous module -SqlBatchModule
- is completed it is called in some random moments and completed afterWhenAll
completes, not before (as I would expect too):public class FileBatchJobModule : IBatchJobModule { public Func<IBatchJobContext, Task> OnCompletedAsync { get; set; } public async Task RunAsync(IBatchJobContext context) { //TODO Log.Information($"FileBatchJobModule, RunAsync started at {DateTime.Now}"); const int defaultDelay = 15000; await Task.Delay( context.Parameters.TryGetValue("delay", out string delayString) ? (int.TryParse(delayString, out int delay) ? delay : defaultDelay) : defaultDelay); byte[] encodedText = Encoding.Unicode.GetBytes($"Updating text from async call at {DateTime.Now}{Environment.NewLine}"); using (var sourceStream = new FileStream( @"C:\CentralTools\Files\filebatchjobmodule.txt", FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) { await sourceStream.WriteAsync(encodedText, 0, encodedText.Length); }; if (OnCompletedAsync != null) { await OnCompletedAsync.Invoke(context); } Log.Information($"FileBatchJobModule, RunAsync ended at {DateTime.Now}"); } }
I resolved my issue by switching from List to List<Func>. I felt it could be the case...
if (prevJob == null || prevJob.IsSynchronousStart.Value == false)
{
batchJobTasks.Add(async () => await module.RunAsync(context));
}
else
{
var previousTask = batchJobTasks.Last();
batchJobTasks.RemoveAt(batchJobTasks.Count - 1);
batchJobTasks.Add(async () => { await previousTask(); await module.RunAsync(context); });
}
这篇关于如何顺序+并行实现和运行任务集合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!