限制异步任务 [英] Throttling asynchronous tasks

查看:33
本文介绍了限制异步任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想运行一堆异步任务,并限制在任何给定时间待完成的任务数量.

假设您有 1000 个 URL,并且您只想一次打开 50 个请求;但是一旦一个请求完成,您就会打开到列表中下一个 URL 的连接.那样的话,一次总是正好有 50 个连接打开,直到 URL 列表用完为止.

如果可能,我还想使用给定数量的线程.

我想出了一个扩展方法,ThrottleTasksAsync 可以满足我的需求.有没有更简单的解决方案?我认为这是一个常见的场景.

用法:

class 程序{静态无效主(字符串 [] args){Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();Console.WriteLine("按一个键退出...");Console.ReadKey(true);}}

代码如下:

静态类 IEnumerableExtensions{公共静态异步任务ThrottleTasksAsync(这个IEnumerable enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func> taskToRun){var blocksQueue = new BlockingCollection(new ConcurrentBag());var semaphore = new SemaphoreSlim(maxConcurrentTasks);//在单独的线程上运行油门.var t = Task.Run(() =>{foreach(可枚举中的变量项目){//等待信号量信号量.等待();阻塞队列.添加(项目);}阻塞队列.CompleteAdding();});var taskList = new List>();Parallel.ForEach(Ite​​rateUntilTrue(() =>blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },_ =>{Enumerable_T 项;if (blockingQueue.TakeTake(out item, 100)){任务列表.添加(//运行任务taskToRun(项目).ContinueWith(tsk =>{//为了效果线程睡眠(2000);//释放信号量semaphore.Release();返回 tsk.Result;}));}});//等待所有任务.返回等待 Task.WhenAll(taskList);}静态 IEnumerableIterateUntilTrue(Func 条件){while (!condition()) yield return true;}}

该方法利用 BlockingCollectionSemaphoreSlim 使其工作.节流器在一个线程上运行,所有异步任务都在另一个线程上运行.为了实现并行性,我添加了一个 maxDegreeOfParallelism 参数,该参数传递给 Parallel.ForEach 循环,重新用作 while 循环.

旧版本是:

foreach (var master = ...){var 详细信息 = ...;Parallel.ForEach(详细信息,详细信息 => {//这里处理每条详细记录}, 新的 ParallelOptions { MaxDegreeOfParallelism = 15 });//这里执行最后的批量更新}

但是,线程池很快就会耗尽,你不能做async/await.

奖励:为了解决 BlockingCollection 中的问题,当调用 CompleteAdding()Take() 中抛出异常,我使用了 TryTake 超时重载.如果我没有在 TryTake 中使用超时,它会破坏使用 BlockingCollection 的目的,因为 TryTake 不会阻塞.有没有更好的办法?理想情况下,应该有一个 TakeAsync 方法.

解决方案

按照建议,使用 TPL 数据流.

A TransformBlock 可能就是你要找的.

您定义一个 MaxDegreeOfParallelism 来限制可以并行转换的字符串数量(即可以下载的网址数量).然后您将 URL 发布到块,完成后告诉块您已完成添加项目并获取响应.

var downloader = new TransformBlock(网址 =>下载(网址),新的 ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });var buffer = new BufferBlock();downloader.LinkTo(buffer);foreach(网址中的var url)下载器.发布(网址);//或等待downloader.SendAsync(url);下载器.Complete();等待下载器完成;IList回应;如果(buffer.TryReceiveAll(输出响应)){//处理响应}

<小时>

注意:TransformBlock 缓冲其输入和输出.那么,为什么我们需要将它链接到 BufferBlock?

因为 TransformBlock 在所有项目 (HttpResponse) 都消耗完之前不会完成,并且 await downloader.Completion 会挂起.相反,我们让 downloader 将其所有输出转发到专用缓冲区块 - 然后我们等待 downloader 完成,并检查缓冲区块.

I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

I also want to utilize a given number of threads if possible.

I came up with an extension method, ThrottleTasksAsync that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.

Usage:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

Here is the code:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

The method utilizes BlockingCollection and SemaphoreSlim to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach loop re-purposed as a while loop.

The old version was:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

But, the thread pool gets exhausted fast, and you can't do async/await.

Bonus: To get around the problem in BlockingCollection where an exception is thrown in Take() when CompleteAdding() is called, I'm using the TryTake overload with a timeout. If I didn't use the timeout in TryTake, it would defeat the purpose of using a BlockingCollection since TryTake won't block. Is there a better way? Ideally, there would be a TakeAsync method.

解决方案

As suggested, use TPL Dataflow.

A TransformBlock<TInput, TOutput> may be what you're looking for.

You define a MaxDegreeOfParallelism to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}


Note: The TransformBlock buffers both its input and output. Why, then, do we need to link it to a BufferBlock?

Because the TransformBlock won't complete until all items (HttpResponse) have been consumed, and await downloader.Completion would hang. Instead, we let the downloader forward all its output to a dedicated buffer block - then we wait for the downloader to complete, and inspect the buffer block.

这篇关于限制异步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆