节流异步任务 [英] Throttling asynchronous tasks

查看:183
本文介绍了节流异步任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想运行一堆异步任务,在任何给定的时间有多少任务,可等待完成的极限。

I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.

假设你有1000个网址,而您只希望有50个请求在同一时间打开;但只要一个请求完成后,你打开列表中的下一个URL连接。这样一来,总有整整50连接,同时打开,直到URL列表耗尽。

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

我也想利用线程的给定数目,如果可能的。

I also want to utilize a given number of threads if possible.

我想出了一个扩展方法, ThrottleTasksAsync 是我想要做什么。是否有一个简单的解决方案已经在那里?我会认为这是一个常见的​​场景。

I came up with an extension method, ThrottleTasksAsync that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.

用法:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

下面是code:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

该方法利用 BlockingCollection SemaphoreSlim ,使其工作。的调节器是一个线程上运行,并且所有的异步任务的其他线程上运行。为了实现并行,我补充说,是传递给maxDegreeOfParallelism参数 Parallel.ForEach 循环再意为,而循环

The method utilizes BlockingCollection and SemaphoreSlim to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach loop re-purposed as a while loop.

老版本是:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

但是,线程池被耗尽快,你不能做异步 / 等待

奖励:
为了解决该问题,在 BlockingCollection 其中一个例外是抛出采取() CompleteAdding ()被调用时,我使用了超时 TryTake 超载。如果我不使用 TryTake 超时,它会破坏使用 BlockingCollection ,因为<$ C的目的$ C> TryTake 不会阻止。有没有更好的办法?理想情况下,会有一个 TakeAsync 方法。

Bonus: To get around the problem in BlockingCollection where an exception is thrown in Take() when CompleteAdding() is called, I'm using the TryTake overload with a timeout. If I didn't use the timeout in TryTake, it would defeat the purpose of using a BlockingCollection since TryTake won't block. Is there a better way? Ideally, there would be a TakeAsync method.

推荐答案

按照要求,这里的code我结束了下去。

As requested, here's the code I ended up going with.

这项工作是建立在一个主从配置,每个主被处理为一个批次。每个工作单元以这种方式排队:

The work is set up in a master-detail configuration, and each master is processed as a batch. Each unit of work is queued up in this fashion:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

硕士被缓冲一次一个保存为其他以外的进程的工作。每个主站的细节分派通过 masterTransform TransformManyBlock 的工作。 A BatchedJoinBlock 还创建收集细节一批。

Masters are buffered one at a time to save work for other outside processes. The details for each master are dispatched for work via the masterTransform TransformManyBlock. A BatchedJoinBlock is also created to collect the details in one batch.

实际的工作在 detailTransform TransformBlock ,异步,150在时间内完成。 BoundedCapacity 设置为300,以确保太多的大师没有得到在链的开始缓冲,同时还留出空间不够详细记录进行排队,让150记录在同一时间进行处理。该块中的对象输出到其目标,因为它是整个取决于它是否是一个详细信息或<$链接过滤C $ C>例外。

The actual work is done in the detailTransform TransformBlock, asynchronously, 150 at a time. BoundedCapacity is set to 300 to ensure that too many Masters don't get buffered at the beginning of the chain, while also leaving room for enough detail records to be queued to allow 150 records to be processed at one time. The block outputs an object to its targets, because it's filtered across the links depending on whether it's a Detail or Exception.

batchAction ActionBlock 收集来自所有批次的输出,并进行批量数据库更新,错误日志,等等。对于每个批次

The batchAction ActionBlock collects the output from all the batches, and performs bulk database updates, error logging, etc. for each batch.

将有几个 BatchedJoinBlock S,为每个主。由于每个 ISourceBlock 是输出顺序和每批只接受与一种主相关联的详细记录的数目,批次将按顺序被处理。每块仅输出一组,并且是在完成解除链接。只有最后一批块传播其完成最终的 ActionBlock

There will be several BatchedJoinBlocks, one for each master. Since each ISourceBlock is output sequentially and each batch only accepts the number of detail records associated with one master, the batches will be processed in order. Each block only outputs one group, and is unlinked on completion. Only the last batch block propagates its completion to the final ActionBlock.

数据流网络:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });

这篇关于节流异步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆