如何顺序+并行实现和运行任务集合? [英] How to implement and run the collection of tasks sequentially + in parallel?

查看:49
本文介绍了如何顺序+并行实现和运行任务集合?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Tasks 列表,其中每个元素代表 async Task 方法调用.当我构建这个列表时 - 我决定这个 Task 是需要在前一个任务完成后运行还是与它并行运行(基于配置设置).然而,这并没有像预期的那样工作 - ContinueWith 中的部分过着自己的生活,无论我使用什么选项(请参阅下面的源代码).

日志 - 现在我设置说任务需要按顺序运行(IsSynchronousStart == true):

<块引用>

2021-02-05 13:20:53.616 +02:00 [INF] BatchMasterModule,InitAsync 开始

<块引用>

2021-02-05 13:20:55.258 +02:00 [INF] SqlBatchJobModule,RunAsync 开始

<块引用>

2021-02-05 13:22:11.711 +02:00 [INF] SqlBatchJobModule,RunAsync 结束

<块引用>

2021-02-05 13:22:11.713 +02:00 [INF] FileBatchJobModule,RunAsync 开始

<块引用>

2021-02-05 13:22:11.713 +02:00 [INF] 批处理任务状态为 RanToCompletion

<块引用>

2021-02-05 13:22:14.644 +02:00 [INF] BatchMasterModule,InitAsync 开始

<块引用>

2021-02-05 13:22:16.224 +02:00 [INF] SqlBatchJobModule,RunAsync 开始

<块引用>

2021-02-05 13:22:20.638 +02:00 [INF] FileBatchJobModule,RunAsync 结束 (!!!)

这里有两个任务:batchJobTasks[0] 匹配 SqlBatchJobModulebatchJobTasks[1] 匹配 FileBatchJobModule.

FileBatchJobModule,RunAsync 已启动"- 这必须在SqlBatchJobModule,RunAsync 结束"之后调用;(有时会按预期运行,有时 - 在批处理任务状态为 RanToCompletion"之后)

FileBatchJobModule,RunAsync 结束";- 这必须在批处理任务状态为 RanToCompletion"之前调用;(现在绝对不行)

BatchMasterModule,InitAsync 启动";- 这表示及时调用相同的方法

更新

  1. InitAsync的源代码(缩减——保留了与Task列表创建相关的部分):

     公共类 BatchMasterModule : IBatchMasterModule{公共异步任务 InitAsync([DisableConcurrentExecutionKeyAttribute] FullBatchKeyDto batchKey){...var batchJobs = await _batchJobAppService.GetListForServerAsync(batchKey);...var batchJobTasks = new List();BatchJobDto prevJob = null;foreach(batchJobs 中的 var batchJob){if (_batchJobModuleFactory.TryGet(batchJob.Module?.Runtime, out IBatchJobModule 模块)){...var context = new BatchJobContext(_服务提供者,batchKey.TenantId,批处理作业);...if (prevJob == null || prevJob.IsSynchronousStart.Value == false){batchJobTasks.Add(module.RunAsync(context));}别的{var previousTask = batchJobTasks.Last();previousTask = previousTask.ContinueWith(result => module.RunAsync(context));}prevJob = 批处理作业;}别的{throw new ArgumentNullException(nameof(batchJob.Module.Runtime));}}等待任务.WhenAll(batchJobTasks).ContinueWith(result =>{if (batch.PrintReport.HasValue && batch.PrintReport.Value == true){//TODO:只有在成功执行作业时才执行报告?Log.Information($"Batch tasks status is {result.Status}. OK, will print report at {DateTime.Now}");}});}}

  2. FileBatchJobModule - 未按预期调用 - 而不是在前一个模块 - SqlBatchModule - 完成时调用,而是在某些随机时刻调用,并且在 WhenAll 完成之后完成,而不是之前(正如我所期望的那样):

     公共类 FileBatchJobModule : IBatchJobModule{public FuncOnCompletedAsync { 获取;放;}公共异步任务 RunAsync(IBatchJobContext 上下文){//去做Log.Information($"FileBatchJobModule, RunAsync 开始于 {DateTime.Now}");const int defaultDelay = 15000;等待任务.延迟(context.Parameters.TryGetValue("delay", out string delayString)?(int.TryParse(delayString, out int delay) ? delay : defaultDelay):默认延迟);byte[] encodingText = Encoding.Unicode.GetBytes($"在{DateTime.Now}{Environment.NewLine}处更新来自异步调用的文本");使用 (var sourceStream = new FileStream(@C:\CentralTools\Files\filebatchjobmodule.txt",FileMode.Append,FileAccess.Write,文件共享.无,缓冲区大小:4096,useAsync: 真)){等待 sourceStream.WriteAsync(encodedText, 0, encodingText.Length);};如果(OnCompletedAsync != null){等待 OnCompletedAsync.Invoke(context);}Log.Information($"FileBatchJobModule, RunAsync 于 {DateTime.Now} 结束");}}

解决方案

我通过从 List 切换到 List 解决了我的问题.我觉得可能是这样...

 if (prevJob == null || prevJob.IsSynchronousStart.Value == false){batchJobTasks.Add(async () => await module.RunAsync(context));}别的{var previousTask = batchJobTasks.Last();batchJobTasks.RemoveAt(batchJobTasks.Count - 1);batchJobTasks.Add(async () => { await previousTask(); await module.RunAsync(context); });}

I have a list of Tasks, where each element represents async Task method call. When I construct this list - I make decision on whether this Task needs to be run after the previous one has completed or in parallel with it (based on configuration settings). However, this does not work as expected - the part from ContinueWith lives its own life, no matter what options I use for it (please see piece of source code below).

Log - now I have setting saying the tasks need to be run sequentially (IsSynchronousStart == true):

2021-02-05 13:20:53.616 +02:00 [INF] BatchMasterModule, InitAsync started

2021-02-05 13:20:55.258 +02:00 [INF] SqlBatchJobModule, RunAsync started

2021-02-05 13:22:11.711 +02:00 [INF] SqlBatchJobModule, RunAsync ended

2021-02-05 13:22:11.713 +02:00 [INF] FileBatchJobModule, RunAsync started

2021-02-05 13:22:11.713 +02:00 [INF] Batch tasks status is RanToCompletion

2021-02-05 13:22:14.644 +02:00 [INF] BatchMasterModule, InitAsync started

2021-02-05 13:22:16.224 +02:00 [INF] SqlBatchJobModule, RunAsync started

2021-02-05 13:22:20.638 +02:00 [INF] FileBatchJobModule, RunAsync ended (!!!)

There are two tasks here: batchJobTasks[0] matches SqlBatchJobModule, batchJobTasks[1] matches FileBatchJobModule.

"FileBatchJobModule, RunAsync started" - this had to be called after "SqlBatchJobModule, RunAsync ended" (this is SOMETIMES run as expected, sometimes - after "Batch tasks status is RanToCompletion")

"FileBatchJobModule, RunAsync ended" - this had to be called before "Batch tasks status is RanToCompletion" (is NEVER OK now)

"BatchMasterModule, InitAsync started" - this represents a scheduled invocation of the same method in time

UPDATE

  1. the source code of InitAsync (reduced - the part related to Task list creation retained):

     public class BatchMasterModule : IBatchMasterModule
     {
         public async Task InitAsync([DisableConcurrentExecutionKeyAttribute] FullBatchKeyDto batchKey)
         {
             ...
    
             var batchJobs = await _batchJobAppService.GetListForServerAsync(batchKey);
    
             ...
    
             var batchJobTasks = new List<Task>();
    
             BatchJobDto prevJob = null;
    
             foreach (var batchJob in batchJobs)
             {
                 if (_batchJobModuleFactory.TryGet(batchJob.Module?.Runtime, out IBatchJobModule module))
                 {
                     ...
                     var context = new BatchJobContext
                     (
                         _serviceProvider,
                         batchKey.TenantId,
                         batchJob
                     );
    
                     ...
    
                     if (prevJob == null || prevJob.IsSynchronousStart.Value == false)
                     {
                         batchJobTasks.Add(module.RunAsync(context));
                     }
                     else
                     {
                         var previousTask = batchJobTasks.Last();
                         previousTask = previousTask.ContinueWith(result => module.RunAsync(context));
                     }
    
                     prevJob = batchJob;
                 }
                 else
                 {
                     throw new ArgumentNullException(nameof(batchJob.Module.Runtime));
                 }
             }
    
             await Task
                 .WhenAll(batchJobTasks)
                 .ContinueWith(result =>
                 {
                     if (batch.PrintReport.HasValue && batch.PrintReport.Value == true)
                     {
                         //TODO: perform reporting only if successful job execution?
                         Log.Information($"Batch tasks status is {result.Status}. OK, will print report at {DateTime.Now}");
                     }
                 });
           }
      }
    

  2. FileBatchJobModule - which is not called as expected - instead of being called when the previous module - SqlBatchModule - is completed it is called in some random moments and completed after WhenAll completes, not before (as I would expect too):

     public class FileBatchJobModule : IBatchJobModule
     {
         public Func<IBatchJobContext, Task> OnCompletedAsync { get; set; }
    
         public async Task RunAsync(IBatchJobContext context)
         {
             //TODO
             Log.Information($"FileBatchJobModule, RunAsync started at {DateTime.Now}");
             const int defaultDelay = 15000;
             await Task.Delay(
                 context.Parameters.TryGetValue("delay", out string delayString)
                 ? (int.TryParse(delayString, out int delay) ? delay : defaultDelay)
                 : defaultDelay);
             byte[] encodedText = Encoding.Unicode.GetBytes($"Updating text from async call at {DateTime.Now}{Environment.NewLine}");
             using (var sourceStream = new FileStream(
                 @"C:\CentralTools\Files\filebatchjobmodule.txt",
                 FileMode.Append,
                 FileAccess.Write,
                 FileShare.None,
                 bufferSize: 4096,
                 useAsync: true))
             {
                 await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
             };
             if (OnCompletedAsync != null)
             {
                 await OnCompletedAsync.Invoke(context);
             }
             Log.Information($"FileBatchJobModule, RunAsync ended at {DateTime.Now}");
         }
     }
    

解决方案

I resolved my issue by switching from List to List<Func>. I felt it could be the case...

                if (prevJob == null || prevJob.IsSynchronousStart.Value == false)
                {
                    batchJobTasks.Add(async () => await module.RunAsync(context));
                }
                else
                {
                    var previousTask = batchJobTasks.Last();
                    batchJobTasks.RemoveAt(batchJobTasks.Count - 1);
                    batchJobTasks.Add(async () => { await previousTask(); await module.RunAsync(context); });
                }

这篇关于如何顺序+并行实现和运行任务集合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆