在 C# 中排队异步任务 [英] Queuing asynchronous task in C#

查看:22
本文介绍了在 C# 中排队异步任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有几个方法可以向数据库报告一些数据.我们希望异步调用对数据服务的所有调用.这些对数据服务的调用已经结束,因此我们要确保这些 DS 调用在任何给定时间按顺序依次执行.最初,我对这些方法中的每一个都使用异步等待,并且每个调用都是异步执行的,但是我们发现它们是否不按顺序进行,那么就有出错的余地.

所以,我认为我们应该将所有这些异步任务排队并在单独的线程中发送它们,但我想知道我们有哪些选择?我遇到了 'SemaphoreSlim' .这是否适合我的用例?或者还有哪些其他选项适合我的用例?请指导我.

那么,我目前的代码中有什么

public static SemaphoreSlim mutex = new SemaphoreSlim(1);//第一个DS调用公共异步任务 SendModuleDataToDSAsync(模块参数){var tasks1 = new List<任务>();var tasks2 = new List<任务>();//等待互斥锁.WaitAsync();**//这是使用 SemaphoreSlim 的正确方法吗?**foreach(Module.param 中的 var 设置){任务 job1 = SaveModule(setting);任务1.添加(工作1);任务 job2= SaveModule(GetAdvancedData(setting));任务2.添加(工作2);}等待 Task.WhenAll(tasks1);等待 Task.WhenAll(tasks2);//mutex.Release();//**这个对吗?**}私有异步任务 SaveModule(模块设置){等待Task.Run(() =>{//调用 DS...});}

//在主线程的某个地方,调用对 DS 的第二次调用

//第二次 DS 调用私有异步任务 SendInstrumentSettingsToDS(<param1>, <param2>){//await mutex.WaitAsync();//**这是正确的吗?**等待Task.Run(() =>{//TrackInstrumentInfoToDS//mutex.Release();//**这是正确的吗?**});如果(参数2){等待Task.Run(() =>{//TrackParam2InstrumentInfoToDS});}}

解决方案

最初,我对这些方法中的每一个都使用异步等待,并且每个调用都是异步执行的,但我们发现它们是否不按顺序进行,那么就有出错的余地.

<块引用>

所以,我认为我们应该将所有这些异步任务排队并在单独的线程中发送它们,但我想知道我们有哪些选择?我遇到了SemaphoreSlim".

SemaphoreSlim 确实将异步代码限制为一次运行一个,并且是互斥 的有效形式.然而,由于乱序"调用可能会导致错误,那么 SemaphoreSlim 不是一个合适的解决方案,因为它不保证 FIFO.

从更一般的意义上说,没有同步原语可以保证 FIFO,因为这可能会由于锁车队等副作用而导致问题.另一方面,数据结构自然是严格的 FIFO.

因此,您需要使用自己的 FIFO 队列,而不是使用隐式执行队列.Channels 是一个不错的、高性能的、异步兼容的队列,但是由于您使用的是旧版本的 C#/.NET,所以 BlockingCollection<T> 可以工作:

公共密封类ExecutionQueue{私有只读 BlockingCollection<Func<Task>>_queue = new BlockingCollection<Func<Task>>();公共执行队列()=>完成 = Task.Run(() => ProcessQueueAsync());公共任务完成{获取;}公共无效完成()=>_queue.CompleteAdding();私有异步任务 ProcessQueueAsync(){foreach(_queue.GetConsumingEnumerable() 中的变量值)等待值();}}

此设置唯一棘手的部分是如何排队工作.从排队工作的代码的角度来看,他们想知道 lambda 何时执行,而不是 lambda 何时排队.从队列方法(我称之为Run)的角度来看,该方法只需要在执行完lambda 后完成其返回的任务.所以,你可以这样写队列方法:

public Task Run(Func lambda){var tcs = new TaskCompletionSource();_queue.Add(async() =>{//执行 lambda 并将结果传播到从 Run 返回的 Task尝试{等待 lambda();tcs.TrySetResult(null);}捕捉(OperationCanceledException ex){tcs.TrySetCanceled(ex.CancellationToken);}捕捉(例外前){tcs.TrySetException(ex);}});返回 tcs.Task;}

这种排队方法并不完美.如果一个任务完成时出现多个异常(这对于并行代码是正常的),则只保留第一个异常(这对于异步代码是正常的).OperationCanceledException 处理还有一个边缘情况.但是这段代码对于大多数情况来说已经足够好了.

现在你可以像这样使用它了:

public static ExecutionQueue _queue = new ExecutionQueue();公共异步任务 SendModuleDataToDSAsync(模块参数){var tasks1 = new List<任务>();var tasks2 = new List<任务>();foreach(Module.param 中的 var 设置){任务job1 = _queue.Run(() => SaveModule(setting));任务1.添加(工作1);任务job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));任务2.添加(工作2);}等待 Task.WhenAll(tasks1);等待 Task.WhenAll(tasks2);}

I have few methods that report some data to Data base. We want to invoke all calls to Data service asynchronously. These calls to data service are all over and so we want to make sure that these DS calls are executed one after another in order at any given time. Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' . Will this be appropriate in my use case? Or what other options will suit my use case? Please, guide me.

So, what i have in my code currently

public static SemaphoreSlim mutex = new SemaphoreSlim(1);

//first DS call 

 public async Task SendModuleDataToDSAsync(Module parameters)
    {
        var tasks1 = new List<Task>();
        var tasks2 = new List<Task>();

        //await mutex.WaitAsync(); **//is this correct way to use SemaphoreSlim ?**
        foreach (var setting in Module.param)
        {
           Task job1 = SaveModule(setting);
           tasks1.Add(job1);
           Task job2= SaveModule(GetAdvancedData(setting));
           tasks2.Add(job2);
        }

        await Task.WhenAll(tasks1);
        await Task.WhenAll(tasks2);

        //mutex.Release(); // **is this correct?**
    }

 private async Task SaveModule(Module setting)
    {
        await Task.Run(() =>
            {
             // Invokes Calls to DS
             ... 
            });
    }

//somewhere down the main thread, invoking second call to DS

  //Second DS Call
 private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
 {
    //await mutex.WaitAsync();// **is this correct?**
    await Task.Run(() =>
            {
                 //TrackInstrumentInfoToDS
                 //mutex.Release();// **is this correct?**
            });
    if(param2)
    {
        await Task.Run(() =>
               {
                  //TrackParam2InstrumentInfoToDS
               });
    }
 }

解决方案

Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' .

SemaphoreSlim does restrict asynchronous code to running one at a time, and is a valid form of mutual exclusion. However, since "out of sequence" calls can cause errors, then SemaphoreSlim is not an appropriate solution since it does not guarantee FIFO.

In a more general sense, no synchronization primitive guarantees FIFO because that can cause problems due to side effects like lock convoys. On the other hand, it is natural for data structures to be strictly FIFO.

So, you'll need to use your own FIFO queue, rather than having an implicit execution queue. Channels is a nice, performant, async-compatible queue, but since you're on an older version of C#/.NET, BlockingCollection<T> would work:

public sealed class ExecutionQueue
{
  private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();

  public ExecutionQueue() => Completion = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

The only tricky part with this setup is how to queue work. From the perspective of the code queueing the work, they want to know when the lambda is executed, not when the lambda is queued. From the perspective of the queue method (which I'm calling Run), the method needs to complete its returned task only after the lambda is executed. So, you can write the queue method something like this:

public Task Run(Func<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(async () =>
  {
    // Execute the lambda and propagate the results to the Task returned from Run
    try
    {
      await lambda();
      tcs.TrySetResult(null);
    }
    catch (OperationCanceledException ex)
    {
      tcs.TrySetCanceled(ex.CancellationToken);
    }
    catch (Exception ex)
    {
      tcs.TrySetException(ex);
    }
  });
  return tcs.Task;
}

This queueing method isn't as perfect as it could be. If a task completes with more than one exception (this is normal for parallel code), only the first one is retained (this is normal for async code). There's also an edge case around OperationCanceledException handling. But this code is good enough for most cases.

Now you can use it like this:

public static ExecutionQueue _queue = new ExecutionQueue();

public async Task SendModuleDataToDSAsync(Module parameters)
{
  var tasks1 = new List<Task>();
  var tasks2 = new List<Task>();

  foreach (var setting in Module.param)
  {
    Task job1 = _queue.Run(() => SaveModule(setting));
    tasks1.Add(job1);
    Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
    tasks2.Add(job2);
  }

  await Task.WhenAll(tasks1);
  await Task.WhenAll(tasks2);
}

这篇关于在 C# 中排队异步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆