延续任务悬挂使用LimitedConcurrencyLevelTask​​Scheduler当 [英] Continuation Tasks Hanging When Using LimitedConcurrencyLevelTaskScheduler

查看:983
本文介绍了延续任务悬挂使用LimitedConcurrencyLevelTask​​Scheduler当的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有我的使用TPL在C#(.NET 4.0)工作。

I've my working on using the TPL in C# (.NET 4.0).

我创建了一个定制的API,以减轻Web请求的创建和下载内容(异步,使用延续任务)。这部分工作正常。

I have created a custom API to ease the creation of web requests and downloading the contents (asynchronously, using continuation tasks). That part is working fine.

当我尝试使用 LimitedConcurrencyLevelTask​​Scheduler (在并行编程并在的的任务MSDN文档)递延创建任务。如果你不熟悉的类,它的作用是限制定于任意数量的任务并发度。

The problem I have occurs when I try to use the LimitedConcurrencyLevelTaskScheduler (found in the Samples for Parallel Programming and in the MSDN documentation for tasks) with deferred task creation. If you're not familiar with that class, all it does is limit the degree of concurrency of tasks scheduled to an arbitrary number.

基本上我想web请求任务链的创建推迟到由 LimitedConcurrencyLevelTask​​Scheduler 被调度,这样我可以限制并发下载的数量的任务。

Basically I want to defer the creation of web request task chains into a task being scheduled by the LimitedConcurrencyLevelTaskScheduler so that I can limit the number of concurrent downloads.

<一个href=\"http://social.msdn.microsoft.com/Forums/en/parallelextensions/thread/ce2bc75d-4135-4f54-b4e1-0237062b255b\"相对=nofollow>的建议由圣人斯蒂芬Toub ,推迟一个工作的创作时,做的最好的事情就是你的API来设计返回 Func键&LT;任务&GT; Func键&LT;任务&LT; TResult&GT;&GT; 。我已经做到了这一点。

As suggested by the sage Stephen Toub, when deferring the creation of a Task, the best thing to do is to design your API to return a Func<Task> or Func<Task<TResult>>. I have done this.

不幸的是,我的程序安排在第一组的并发任务后挂起。说我有我的任务限制在4度的并发性。在这种情况下,4个任务将被启动,然后该程序将挂起。任务永远不会结束。

Unfortunately, my program hangs after scheduling the first set of concurrent tasks. Say I have my tasks limited to 4 degrees of concurrency. In that case, 4 tasks would be started and then the program would hang. The tasks would never complete.

我创建了一个小例子,简单地说明问题。我使用的文件读取,而不是使用的WebRequest 。我已经限制了度并发至1

I have created a minimal example to illustrate the problem simply. I am using file reads instead of using a WebRequest. I have limited the degrees of concurrency to 1.

class Program
{
    static Func<Task> GetReadTask()
    {
        return () =>
        {
            Console.WriteLine("Opening file.");

            FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);

            byte[] buffer = new byte[32];

            Console.WriteLine("Beginning read.");
            return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
        };
    }

    static void Main()
    {
        LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
        TaskFactory factory = new TaskFactory(ts);

        int[] range = {1, 2, 3};

        var tasks = range.Select(number =>
        {
            Func<Task> getTask = GetReadTask();
            return factory.StartNew(() =>
            {
                var task = getTask();
                task.Wait();
            });
        });

        Task.WaitAll(tasks.ToArray());
    }
}

要澄清我的意思是它挂起,这是输出的样子。

To clarify what I mean by "it hangs", this is what the output looks like.

Opening file.
Beginning read.

再没有别的印......永远。

And then nothing else is printed... forever.

这是怎么回事任何线索?

Any clues on what is going on?

推荐答案

好问题!

首先,我不知道 LimitedConcurrencyLevelTask​​Scheduler 是学术上正确的解决方案。为了使该限制的并发请求数N,你必须阻止哪一种失败摆在首位使用APM异步调用的目的N个任务。

Firstly, I'm not sure LimitedConcurrencyLevelTaskScheduler is the academically correct solution. In order for this to limit the number of concurrent requests to N, you have to block N tasks which kind of defeats the purpose of using APM async calls in the first place.

话虽如此,它更容易一大堆比替代来实现。你将需要有一个工作队列,并保持在飞行的请求的数目的计数,然后根据需要创建工人的任务。这是不平凡得到的权利,如果并发请求数N将是小,具有N阻塞的线程是不是世界的尽头。

Having said that, it is a whole lot easier to implement than the alternative. You would need to have a work queue and keep count of the number of in flight requests, then create worker tasks as required. That's not trivial to get right and if the number N of concurrent requests will be small, having N blocked threads is not the end of the world.

所以,用你的code中的问题是,其他任务中创建的任务使用调度从父任务。其实,这不是与 FromAsync ,因为这些使用底层的APM实现创建任务,真正等等都是有点不同。

So, the problem with your code is that tasks created within other tasks use the scheduler from the parent task. Actually that's not true for tasks created with FromAsync as these use the underlying APM implementation and so are a bit different.

您创建主要任务

return factory.StartNew( () =>
    {
        var task = getTask();
        task.Wait();
    }
);

工厂使用 LimitedConcurrencyLevelTask​​Scheduler(1),所以只有这些任务1可以并发执行,而且一个是等待的任务返回从 getTask()

factory uses the LimitedConcurrencyLevelTaskScheduler( 1 ), so only 1 of these tasks can execute concurrently and that one is waiting on the task returned from getTask().

所以,在 GetReadTask 你叫任务&LT; INT&GT; .Factory.FromAsync 。这运行,因为 FromAsync 不尊重父任务的调度。

So, in GetReadTask you call Task<int>.Factory.FromAsync. This runs because FromAsync doesn't respect the parent task's scheduler.

然后创建一个 .ContinueWith延续(任务=&GT; fileStream.Close())。这将创建一个不尊重其父母的调度任务。因为 LimitedConcurrencyLevelTask​​Scheduler 已经执行的任务(一个在就是阻止)延续无法运行,你有一​​个僵局。

Then you create a continuation with .ContinueWith(task => fileStream.Close()). This creates a task that does respect its parent's scheduler. Since the LimitedConcurrencyLevelTaskScheduler is already executing a task ( the one in Main that's blocked ) the continuation cannot run and you have a deadlock.

该解决方案是运行与 TaskScheduler.Default 在一个正常的线程池线程的延续。然后它运行并僵局被打​​破了。

The solution is to run the continuation on a normal thread pool thread with TaskScheduler.Default. It then gets to run and the deadlock is broken.

下面是我的解决方案:

static Task QueueReadTask( TaskScheduler ts, int number )
{
    Output.Write( "QueueReadTask( " + number + " )" );

    return Task.Factory.StartNew( () =>
        {
            Output.Write( "Opening file " + number + "." );

            FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );

            byte[] buffer = new byte[ 32 ];

            var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );

            var tClose = tRead.ContinueWith( task =>
                    {
                        Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
                        fileStream.Close();
                    }
                    , TaskScheduler.Default
                );

            tClose.Wait();
        }
        , CancellationToken.None
        , TaskCreationOptions.None
        , ts
    );
}

现在看起来是这样的:

And Main now looks like this:

static void Main()
{
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );

    int[] range = { 1, 2, 3 };

    var tasks = range.Select( number =>
        {
            var task = QueueReadTask( ts, number );

            return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
        }
    )
    .ToArray();

    Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );

    Task.WaitAll( tasks );

    Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}

有几件事情需要注意:

移动 task.Wait() QueueReadTask 使它更加明显,你是阻塞的任务。您可以删除 FromAsync 呼叫和延续,并与正常的同步调用,因为你是无论如何阻止替换它们。

Moving the task.Wait() into QueueReadTask makes it more obvious that you are blocking a task. You can remove the FromAsync call and the continuation and replace them with a normal synchronous call since you are blocking anyway.

此任务从 QueueReadTask 返回可以延续。默认情况下,因为他们继承父任务的调度没有前因的其中一个在默认调度运行。在这种情况下,没有家长的任务,因此使用默认的调度。

The task returned from QueueReadTask can have continuations. By default, these run under the default scheduler because they inherit the parent task's scheduler not the antecedent's one. In this case there is no parent task, so the default scheduler is used.

这篇关于延续任务悬挂使用LimitedConcurrencyLevelTask​​Scheduler当的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆