延续任务悬挂使用LimitedConcurrencyLevelTaskScheduler当 [英] Continuation Tasks Hanging When Using LimitedConcurrencyLevelTaskScheduler
问题描述
我有我的使用TPL在C#(.NET 4.0)工作。
I've my working on using the TPL in C# (.NET 4.0).
我创建了一个定制的API,以减轻Web请求的创建和下载内容(异步,使用延续任务)。这部分工作正常。
I have created a custom API to ease the creation of web requests and downloading the contents (asynchronously, using continuation tasks). That part is working fine.
当我尝试使用 LimitedConcurrencyLevelTaskScheduler
(在并行编程并在的的任务MSDN文档)递延创建任务。如果你不熟悉的类,它的作用是限制定于任意数量的任务并发度。
The problem I have occurs when I try to use the LimitedConcurrencyLevelTaskScheduler
(found in the Samples for Parallel Programming and in the MSDN documentation for tasks) with deferred task creation. If you're not familiar with that class, all it does is limit the degree of concurrency of tasks scheduled to an arbitrary number.
基本上我想web请求任务链的创建推迟到由 LimitedConcurrencyLevelTaskScheduler
被调度,这样我可以限制并发下载的数量的任务。
Basically I want to defer the creation of web request task chains into a task being scheduled by the LimitedConcurrencyLevelTaskScheduler
so that I can limit the number of concurrent downloads.
<一个href=\"http://social.msdn.microsoft.com/Forums/en/parallelextensions/thread/ce2bc75d-4135-4f54-b4e1-0237062b255b\"相对=nofollow>的建议由圣人斯蒂芬Toub ,推迟一个工作
的创作时,做的最好的事情就是你的API来设计返回 Func键&LT;任务&GT;
或 Func键&LT;任务&LT; TResult&GT;&GT;
。我已经做到了这一点。
As suggested by the sage Stephen Toub, when deferring the creation of a Task
, the best thing to do is to design your API to return a Func<Task>
or Func<Task<TResult>>
. I have done this.
不幸的是,我的程序安排在第一组的并发任务后挂起。说我有我的任务限制在4度的并发性。在这种情况下,4个任务将被启动,然后该程序将挂起。任务永远不会结束。
Unfortunately, my program hangs after scheduling the first set of concurrent tasks. Say I have my tasks limited to 4 degrees of concurrency. In that case, 4 tasks would be started and then the program would hang. The tasks would never complete.
我创建了一个小例子,简单地说明问题。我使用的文件读取,而不是使用的WebRequest
。我已经限制了度并发至1
I have created a minimal example to illustrate the problem simply. I am using file reads instead of using a WebRequest
. I have limited the degrees of concurrency to 1.
class Program
{
static Func<Task> GetReadTask()
{
return () =>
{
Console.WriteLine("Opening file.");
FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);
byte[] buffer = new byte[32];
Console.WriteLine("Beginning read.");
return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
};
}
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(ts);
int[] range = {1, 2, 3};
var tasks = range.Select(number =>
{
Func<Task> getTask = GetReadTask();
return factory.StartNew(() =>
{
var task = getTask();
task.Wait();
});
});
Task.WaitAll(tasks.ToArray());
}
}
要澄清我的意思是它挂起,这是输出的样子。
To clarify what I mean by "it hangs", this is what the output looks like.
Opening file.
Beginning read.
再没有别的印......永远。
And then nothing else is printed... forever.
这是怎么回事任何线索?
Any clues on what is going on?
推荐答案
好问题!
首先,我不知道 LimitedConcurrencyLevelTaskScheduler
是学术上正确的解决方案。为了使该限制的并发请求数N,你必须阻止哪一种失败摆在首位使用APM异步调用的目的N个任务。
Firstly, I'm not sure LimitedConcurrencyLevelTaskScheduler
is the academically correct solution. In order for this to limit the number of concurrent requests to N, you have to block N tasks which kind of defeats the purpose of using APM async calls in the first place.
话虽如此,它更容易一大堆比替代来实现。你将需要有一个工作队列,并保持在飞行的请求的数目的计数,然后根据需要创建工人的任务。这是不平凡得到的权利,如果并发请求数N将是小,具有N阻塞的线程是不是世界的尽头。
Having said that, it is a whole lot easier to implement than the alternative. You would need to have a work queue and keep count of the number of in flight requests, then create worker tasks as required. That's not trivial to get right and if the number N of concurrent requests will be small, having N blocked threads is not the end of the world.
所以,用你的code中的问题是,其他任务中创建的任务使用调度从父任务。其实,这不是与 FromAsync
,因为这些使用底层的APM实现创建任务,真正等等都是有点不同。
So, the problem with your code is that tasks created within other tasks use the scheduler from the parent task. Actually that's not true for tasks created with FromAsync
as these use the underlying APM implementation and so are a bit different.
您创建主要任务
与
return factory.StartNew( () =>
{
var task = getTask();
task.Wait();
}
);
工厂
使用 LimitedConcurrencyLevelTaskScheduler(1)
,所以只有这些任务1可以并发执行,而且一个是等待的任务返回从 getTask()
。
factory
uses the LimitedConcurrencyLevelTaskScheduler( 1 )
, so only 1 of these tasks can execute concurrently and that one is waiting on the task returned from getTask()
.
所以,在 GetReadTask
你叫任务&LT; INT&GT; .Factory.FromAsync
。这运行,因为 FromAsync
不尊重父任务的调度。
So, in GetReadTask
you call Task<int>.Factory.FromAsync
. This runs because FromAsync
doesn't respect the parent task's scheduler.
然后创建一个 .ContinueWith延续(任务=&GT; fileStream.Close())
。这将创建一个不尊重其父母的调度任务。因为 LimitedConcurrencyLevelTaskScheduler
已经执行的任务(一个在主
就是阻止)延续无法运行,你有一个僵局。
Then you create a continuation with .ContinueWith(task => fileStream.Close())
. This creates a task that does respect its parent's scheduler. Since the LimitedConcurrencyLevelTaskScheduler
is already executing a task ( the one in Main
that's blocked ) the continuation cannot run and you have a deadlock.
该解决方案是运行与 TaskScheduler.Default
在一个正常的线程池线程的延续。然后它运行并僵局被打破了。
The solution is to run the continuation on a normal thread pool thread with TaskScheduler.Default
. It then gets to run and the deadlock is broken.
下面是我的解决方案:
static Task QueueReadTask( TaskScheduler ts, int number )
{
Output.Write( "QueueReadTask( " + number + " )" );
return Task.Factory.StartNew( () =>
{
Output.Write( "Opening file " + number + "." );
FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
byte[] buffer = new byte[ 32 ];
var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
var tClose = tRead.ContinueWith( task =>
{
Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
fileStream.Close();
}
, TaskScheduler.Default
);
tClose.Wait();
}
, CancellationToken.None
, TaskCreationOptions.None
, ts
);
}
和主
现在看起来是这样的:
And Main
now looks like this:
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
int[] range = { 1, 2, 3 };
var tasks = range.Select( number =>
{
var task = QueueReadTask( ts, number );
return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
}
)
.ToArray();
Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
Task.WaitAll( tasks );
Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}
有几件事情需要注意:
移动 task.Wait()
到 QueueReadTask
使它更加明显,你是阻塞的任务。您可以删除 FromAsync
呼叫和延续,并与正常的同步调用,因为你是无论如何阻止替换它们。
Moving the task.Wait()
into QueueReadTask
makes it more obvious that you are blocking a task. You can remove the FromAsync
call and the continuation and replace them with a normal synchronous call since you are blocking anyway.
此任务从 QueueReadTask
返回可以延续。默认情况下,因为他们继承父任务的调度没有前因的其中一个在默认调度运行。在这种情况下,没有家长的任务,因此使用默认的调度。
The task returned from QueueReadTask
can have continuations. By default, these run under the default scheduler because they inherit the parent task's scheduler not the antecedent's one. In this case there is no parent task, so the default scheduler is used.
这篇关于延续任务悬挂使用LimitedConcurrencyLevelTaskScheduler当的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!