限制在.NET 4.5并发任务的数量 [英] Limiting the amount of concurrent tasks in .NET 4.5

查看:224
本文介绍了限制在.NET 4.5并发任务的数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

观察以下功能:

 公共任务RunInOrderAsync< TTaskSeed>(IEnumerable的< TTaskSeed> taskSeedGenerator,
    CreateTaskDelegate< TTaskSeed>的CreateTask,
    OnTaskErrorDelegate< TTaskSeed> onError的= NULL,
    OnTaskSuccessDelegate< TTaskSeed>的onSuccess = NULL)其中TTaskSeed:类
{
    动作<异常,TTaskSeed> onFailed =(极好,taskSeed)=>
    {
        如果(onError的!= NULL)
        {
            onError的(EXC,taskSeed);
        }
    };

    动作<任务> onDone = T =>
    {
        VAR taskSeed =(TTaskSeed)t.AsyncState;
        如果(t.Exception!= NULL)
        {
            onFailed(t.Exception,taskSeed);
        }
        否则,如果(的onSuccess!= NULL)
        {
            的onSuccess(T,taskSeed);
        }
    };

    VAR枚举= taskSeedGenerator.GetEnumerator();
    任务task = NULL;
    而(enumerator.MoveNext())
    {
        如果(任务== NULL)
        {
            尝试
            {
                任务=的CreateTask(enumerator.Current);
                Debug.Assert的(的ReferenceEquals(task.AsyncState,enumerator.Current));
            }
            赶上(例外EXC)
            {
                onFailed(EXC,enumerator.Current);
            }
        }
        其他
        {
            任务= task.ContinueWith((T,taskSeed)=>
            {
                onDone(T);
                VAR解析度=的CreateTask((TTaskSeed)taskSeed);
                Debug.Assert的(的ReferenceEquals(res.AsyncState,taskSeed));
                返回水库;
            },enumerator.Current).TaskUnwrap();
        }
    }

    如果(任务!= NULL)
    {
        任务= task.ContinueWith(onDone);
    }

    返回任务;
}
 

其中, TaskUnwrap 是标准的 Task.Unwrap 的状态preserving版本:

 公共静态类扩展
{
    公共静态任务TaskUnwrap(此任务<任务>的任务,对象状态= NULL)
    {
        。返回task.Unwrap()ContinueWith((T,_)=>
        {
            如果(t.Exception!= NULL)
            {
                扔t.Exception;
            }
        }, 州 ?? task.AsyncState);
    }
}
 

RunInOrderAsync 方法允许运行N个任务异步,而是顺序 - 一个接一个。实际上,它运行在与给定的种子1的并发限制

创建的任务

让我们假设由的CreateTask 委托从种子中创建的任务不符合自己多个并行任务。

现在,我想扔在maxConcurrencyLevel参数,因此函数签名是这样的:

 任务RunInOrderAsync< TTaskSeed>(INT maxConcurrencyLevel,
  IEnumerable的< TTaskSeed> taskSeedGenerator,
  CreateTaskDelegate< TTaskSeed>的CreateTask,
  OnTaskErrorDelegate< TTaskSeed> onError的= NULL,
  OnTaskSuccessDelegate< TTaskSeed>的onSuccess = NULL)其中TTaskSeed:类
 

在这里,我有点卡住了。

所谓有问题,像这样的:

  • <一个href="http://stackoverflow.com/questions/2898609/system-threading-tasks-limit-the-number-of-concurrent-tasks">System.Threading.Tasks - 限制并发任务的数量
  • <一个href="http://stackoverflow.com/questions/12366608/task-based-processing-with-a-limit-for-concurrent-task-number-with-net-4-5-and">Task根据处理与使用.NET 4.5和C#
  • 并发任务数的限制
  • <一个href="http://stackoverflow.com/questions/9315937/net-tpl-limited-concurrency-level-task-scheduler-with-task-priority">.Net太平人寿:有限并行级别任务调度程序任务优先级

这基本上提出了两种方法来攻击的问题:

  1. 使用 Parallel.ForEach ParallelOptions 指定 MaxDegreeOfParallelism 属性值等于所需的最大并发级别
  2. 使用自定义的TaskScheduler 与所需的 MaximumConcurrencyLevel 值。

第二种方法不剪,因为涉及的所有任务,必须使用相同的任务调度的实例。对于这一点,所有的方法用于返回工作必须有一个接受过载自定义的TaskScheduler 实例。不幸的是,微软是不是有这方面非常一致。例如, SqlConnection.OpenAsync 不接受这样的说法(但 TaskFactory.FromAsync 一样)。

第一种方法意味着我将不得不任务转化为行动,这样的事情:

 ()=&GT; t.Wait()
 

我不知道这是一个好主意,但我会很高兴得到更多的投入。

另一种方法是使用 TaskFactory.ContinueWhenAny ,但这是凌乱。

任何想法?

修改1

我想澄清的原因,希望限制。我们的任务最终执行针对相同的SQL服务器的SQL语句。我们要的是一个方法来限制​​并发传出SQL语句的数量。这是完全可能的,届时将有来自其他件code同时执行其他SQL语句,但是这一次是一批处理器和可能淹没服务器。

现在,被告知,虽然大家都在谈论相同的SQL服务器,还有同一台服务器上众多数据库。所以,它不是有关限制的开放SQL连接到同一数据库的量,因为该数据库可能不相同的。

这就是为什么厄运的一天解决方案,如 ThreadPool.SetMaxThreads()是无关紧要的。

现在,有关 SqlConnection.OpenAsync 。它被做异步为原因 - 它可能使一个往返到服务器,并因此可能会受到网络等待时间和分布式环境的其它可爱的副作用。因此,它比它做的接受的TaskScheduler 参数等异步方法没有什么不同。我倾向于认为,不接受一个是只是一个错误。

编辑2

我想preserve原有功能的异步精神。因此,我希望避免任何明确阻塞的解决方案。

修改3

由于 @ fsimonazzi的回答我现在有需要的功能工作的实施。这里是code:

  VAR SEM =新SemaphoreSlim(maxConcurrencyLevel);
        VAR任务=新的名单,其中,任务&GT;();

        VAR枚举= taskSeedGenerator.GetEnumerator();
        而(enumerator.MoveNext())
        {
            tasks.Add(sem.WaitAsync()ContinueWith((_,taskSeed)=&GT;
            {
                任务task = NULL;
                尝试
                {
                    任务=的CreateTask((TTaskSeed)taskSeed);
                    如果(任务!= NULL)
                    {
                        Debug.Assert的(的ReferenceEquals(task.AsyncState,taskSeed));
                        任务= task.ContinueWith(T =&GT;
                        {
                            sem.Release();
                            onDone(T);
                        });
                    }
                }
                赶上(例外EXC)
                {
                    sem.Release();
                    onFailed(EXC,(TTaskSeed)taskSeed);
                }
                返回任务;
            },enumerator.Current).TaskUnwrap());
        }

        返回Task.Factory.ContinueWhenAll(tasks.ToArray(),_ =&GT; sem.Dispose());
 

解决方案

您可以使用一个信号量油门的处理。使用WaitAsync()方法,你得到你所期望的不同步。事情是这样的(处理去除误差为简洁起见):

 私有静态异步任务DoStuff&LT; T&GT;(INT maxConcurrency,IEnumerable的&LT; T&GT;的项目,Func键&LT; T,任务&GT;的CreateTask)
{
    使用(VAR SEM =新SemaphoreSlim(maxConcurrency))
    {
        VAR任务=新的名单,其中,任务&GT;();

        的foreach(在项目VAR项)
        {
            等待sem.WaitAsync();
            VAR任务=的CreateTask(项目).ContinueWith(T =&GT; sem.Release());
            tasks.Add(任务);
        }

        等待Task.WhenAll(任务);
    }
}
 

编辑其中信号灯所有释放操作必须被执行的机会之前可以布置以去除错误。

Observer the following function:

public Task RunInOrderAsync<TTaskSeed>(IEnumerable<TTaskSeed> taskSeedGenerator,
    CreateTaskDelegate<TTaskSeed> createTask,
    OnTaskErrorDelegate<TTaskSeed> onError = null,
    OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
{
    Action<Exception, TTaskSeed> onFailed = (exc, taskSeed) =>
    {
        if (onError != null)
        {
            onError(exc, taskSeed);
        }
    };

    Action<Task> onDone = t =>
    {
        var taskSeed = (TTaskSeed)t.AsyncState;
        if (t.Exception != null)
        {
            onFailed(t.Exception, taskSeed);
        }
        else if (onSuccess != null)
        {
            onSuccess(t, taskSeed);
        }
    };

    var enumerator = taskSeedGenerator.GetEnumerator();
    Task task = null;
    while (enumerator.MoveNext())
    {
        if (task == null)
        {
            try
            {
                task = createTask(enumerator.Current);
                Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current));
            }
            catch (Exception exc)
            {
                onFailed(exc, enumerator.Current);
            }
        }
        else
        {
            task = task.ContinueWith((t, taskSeed) =>
            {
                onDone(t);
                var res = createTask((TTaskSeed)taskSeed);
                Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed));
                return res;
            }, enumerator.Current).TaskUnwrap();
        }
    }

    if (task != null)
    {
        task = task.ContinueWith(onDone);
    }

    return task;
}

Where TaskUnwrap is the state preserving version of the standard Task.Unwrap:

public static class Extensions
{
    public static Task TaskUnwrap(this Task<Task> task, object state = null)
    {
        return task.Unwrap().ContinueWith((t, _) =>
        {
            if (t.Exception != null)
            {
                throw t.Exception;
            }
        }, state ?? task.AsyncState);
    }
}

The RunInOrderAsync method allows to run N tasks asynchronously, but sequentially - one after another. In effect, it runs the tasks created from the given seeds with the concurrency limit of 1.

Let us assume that the tasks created from the seeds by the createTask delegate do not correspond themselves to multiple concurrent tasks.

Now, I would like to throw in the maxConcurrencyLevel parameter, so the function signature would look like this:

Task RunInOrderAsync<TTaskSeed>(int maxConcurrencyLevel,
  IEnumerable<TTaskSeed> taskSeedGenerator,
  CreateTaskDelegate<TTaskSeed> createTask,
  OnTaskErrorDelegate<TTaskSeed> onError = null,
  OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class

And here I am a bit stuck.

The SO has questions like these:

Which basically propose two ways to attack the problem:

  1. Using Parallel.ForEach with ParallelOptions specifying the MaxDegreeOfParallelism property value as equal to the desired max concurrency level.
  2. Using a custom TaskScheduler with the desired MaximumConcurrencyLevel value.

The second approach doesn't cut it, because all the tasks involved must use the same task scheduler instance. For that, all the methods used to return a Task must have an overload accepting the custom TaskScheduler instance. Unfortunately, Microsoft is not very consistent with that respect. For instance, SqlConnection.OpenAsync does not accept such an argument (but TaskFactory.FromAsync does).

The first approach implies that I will have to convert tasks to actions, something like this:

() => t.Wait()

I am not sure it is a good idea, but I will be glad to get more input on that.

Another approach is to utilize TaskFactory.ContinueWhenAny, but that is messy.

Any ideas?

EDIT 1

I would like to clarify the reasons for wanting the limit. Our tasks ultimately execute SQL statements against the same SQL server. What we want is a way to limit the amount of concurrent outgoing SQL statements. It is entirely possible that there will be other SQL statements executing concurrently from other pieces of code, but this one is a batch processor and could potentially flood the server.

Now, be advised, that although we are talking about the same SQL server, there are numerous databases on that same server. So, it is not about limiting the amount of open SQL connections to the same database, because the database may not be the same at all.

That is why doom's day solutions like ThreadPool.SetMaxThreads() are irrelevant.

Now, about SqlConnection.OpenAsync. It was made asynchronous for a reason - it might make a roundtrip to the server and thus might be subject to the network latency and other lovely side effects of distributed environment. As such it is no different than other async methods which do accept the TaskScheduler parameter. I tend to think that not accepting one is just a bug.

EDIT 2

I would like to preserve the asynchronous spirit of the original function. Hence I wish to avoid any explicit blocking solutions.

EDIT 3

Thanks to @fsimonazzi's answer I now have a working implementation of the desired functionality. Here is the code:

        var sem = new SemaphoreSlim(maxConcurrencyLevel);
        var tasks = new List<Task>();

        var enumerator = taskSeedGenerator.GetEnumerator();
        while (enumerator.MoveNext())
        {
            tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) =>
            {
                Task task = null;
                try
                {
                    task = createTask((TTaskSeed)taskSeed);
                    if (task != null)
                    {
                        Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed));
                        task = task.ContinueWith(t =>
                        {
                            sem.Release();
                            onDone(t);
                        });
                    }
                }
                catch (Exception exc)
                {
                    sem.Release();
                    onFailed(exc, (TTaskSeed)taskSeed);
                }
                return task;
            }, enumerator.Current).TaskUnwrap());
        }

        return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());

解决方案

You can use a semaphore to throttle the processing. Using the WaitAsync() method you get the asynchrony you expected. Something like this (error handling removed for brevity):

private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask)
{
    using (var sem = new SemaphoreSlim(maxConcurrency))
    {
        var tasks = new List<Task>();

        foreach (var item in items)
        {
            await sem.WaitAsync();
            var task = createTask(item).ContinueWith(t => sem.Release());
            tasks.Add(task);
        }

        await Task.WhenAll(tasks);
    }
}

Edited to remove bug where the semaphore could be disposed before all release operations had a chance to be executed.

这篇关于限制在.NET 4.5并发任务的数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆