如何限制异步IO任务到数据库的数量? [英] How to limit number of async IO tasks to database?

查看:172
本文介绍了如何限制异步IO任务到数据库的数量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个id's列表,我想从数据库中并行获取每个id的数据.我的以下ExecuteAsync方法以非常高的吞吐量被调用,对于每个请求,我们都有500 ids左右的数据,我需要为其提取数据.

I have a list of id's and I want to get data for each of those id in parallel from database. My below ExecuteAsync method is called at very high throughput and for each request we have around 500 ids for which I need to extract data.

所以我得到了下面的代码,在其中循环遍历ids的列表,并并行地对每个id进行异步调用,并且效果很好.

So I have got below code where I am looping around list of ids and making async calls for each of those id in parallel and it works fine.

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper) where T : class
{
    var tasks = new List<Task<T>>(ids.Count);
    // invoking multiple id in parallel to get data for each id from database
    for (int i = 0; i < ids.Count; i++)
    {
        tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
    }

    // wait for all id response to come back
    var responses = await Task.WhenAll(tasks);

    var excludeNull = new List<T>(ids.Count);
    for (int i = 0; i < responses.Length; i++)
    {
        var response = responses[i];
        if (response != null)
        {
            excludeNull.Add(response);
        }
    }
    return excludeNull;
}

private async Task<T> Execute<T>(IPollyPolicy policy,
    Func<CancellationToken, Task<T>> requestExecuter) where T : class
{
    var response = await policy.Policy.ExecuteAndCaptureAsync(
        ct => requestExecuter(ct), CancellationToken.None);
    if (response.Outcome == OutcomeType.Failure)
    {
        if (response.FinalException != null)
        {
            // log error
            throw response.FinalException;
        }
    }

    return response?.Result;
}

问题:

现在您可以看到,我正在循环所有ids,并为每个id并行进行一堆异步调用,这会给数据库带来很多负担(取决于即将到来的请求数).所以我想限制我们对数据库进行异步调用的次数.我修改了ExecuteAsync以使用Semaphore,如下所示,但它看起来并不像我想要的那样:

Now as you can see I am looping all ids and making bunch of async calls to database in parallel for each id which can put lot of load on database (depending on how many request is coming). So I want to limit the number of async calls we are making to database. I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
    Func<CancellationToken, int, Task<T>> mapper) where T : class
{
    var throttler = new SemaphoreSlim(250);
    var tasks = new List<Task<T>>(ids.Count);
    // invoking multiple id in parallel to get data for each id from database
    for (int i = 0; i < ids.Count; i++)
    {
        await throttler.WaitAsync().ConfigureAwait(false);
        try
        {
            tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
        }
        finally
        {
            throttler.Release();
        }
    }

    // wait for all id response to come back
    var responses = await Task.WhenAll(tasks);

    // same excludeNull code check here

    return excludeNull;
}

信号量可以在ThreadsTasks上使用吗?在这里看起来Semaphore用于线程,而SemaphoreSlim用于任务.

Does Semaphore works on Threads or Tasks? Reading it here looks like Semaphore is for Threads and SemaphoreSlim is for tasks.

这是正确的吗?如果是,那么解决此问题并限制我们在此处对数据库进行的异步IO任务数量的最佳方法是什么.

Is this correct? If yes then what's the best way to fix this and limit the number of async IO tasks we make to database here.

推荐答案

任务是线程的抽象,不一定创建新线程.信号量限制了可以访问for循环的线程数. Execute返回的任务不是线程.如果只有1个请求,则即使请求500个ID,for循环中也只有1个线程. 1个线程本身发送所有异步IO任务.

Task is an abstraction on threads, and doesn’t necessarily create a new thread. Semaphore limits the number of threads that can access that for loop. Execute returns a Task which aren’t threads. If there’s only 1 request, there will be only 1 thread inside that for loop, even if it is asking for 500 ids. The 1 thread sends off all the async IO tasks itself.

排序.我不会说任务根本与线程有关.实际上有两种任务:委托任务(一种线程的抽象)和 promise 任务(与线程无关).

Sort of. I would not say that tasks are related to threads at all. There are actually two kinds of tasks: a delegate task (which is kind of an abstraction of a thread), and a promise task (which has nothing to do with threads).

关于SemaphoreSlim,它确实限制了代码块(不是线程)的并发性.

Regarding the SemaphoreSlim, it does limit the concurrency of a block of code (not threads).

我最近开始使用C#,所以我的理解不正确,就像没有线程和任务.

I recently started playing with C# so my understanding is not right looks like w.r.t Threads and Tasks.

我建议阅读我的 async简介最佳做法.跟进没有线程,如果您对线程真正没有涉及到更多感兴趣.

I recommend reading my async intro and best practices. Follow up with There Is No Thread if you're interested more about how threads aren't really involved.

我修改了ExecuteAsync以使用信号量,如下所示,但它看起来并不像我想要的那样

I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do

当前代码仅限制将任务添加到列表中,无论如何一次只能完成一次.您想要做的是限制执行本身:

The current code is only throttling the adding of the tasks to the list, which is only done one at a time anyway. What you want to do is throttle the execution itself:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
  var throttler = new SemaphoreSlim(250);
  var tasks = new List<Task<T>>(ids.Count);

  // invoking multiple id in parallel to get data for each id from database
  for (int i = 0; i < ids.Count; i++)
    tasks.Add(ThrottledExecute(ids[i]));

  // wait for all id response to come back
  var responses = await Task.WhenAll(tasks);

  // same excludeNull code check here
  return excludeNull;

  async Task<T> ThrottledExecute(int id)
  {
    await throttler.WaitAsync().ConfigureAwait(false);
    try {
      return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
    } finally {
      throttler.Release();
    }
  }
}

这篇关于如何限制异步IO任务到数据库的数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆