节流异步任务 [英] Throttling asynchronous tasks
问题描述
我想运行一堆异步任务,在任何给定的时间有多少任务,可等待完成的极限。
I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.
假设你有1000个网址,而您只希望有50个请求在同一时间打开;但只要一个请求完成后,你打开列表中的下一个URL连接。这样一来,总有整整50连接,同时打开,直到URL列表耗尽。
Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.
我也想利用线程的给定数目,如果可能的。
I also want to utilize a given number of threads if possible.
我想出了一个扩展方法, ThrottleTasksAsync
是我想要做什么。是否有一个简单的解决方案已经在那里?我会认为这是一个常见的场景。
I came up with an extension method, ThrottleTasksAsync
that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.
用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
下面是code:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
该方法利用 BlockingCollection
和 SemaphoreSlim
,使其工作。的调节器是一个线程上运行,并且所有的异步任务的其他线程上运行。为了实现并行,我补充说,是传递给maxDegreeOfParallelism参数 Parallel.ForEach
循环再意为,而
循环
The method utilizes BlockingCollection
and SemaphoreSlim
to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach
loop re-purposed as a while
loop.
老版本是:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
但是,线程池被耗尽快,你不能做异步
/ 等待
。
奖励:
为了解决该问题,在 BlockingCollection
其中一个例外是抛出采取()
在 CompleteAdding ()
被调用时,我使用了超时 TryTake
超载。如果我不使用 TryTake
超时,它会破坏使用 BlockingCollection
,因为<$ C的目的$ C> TryTake 不会阻止。有没有更好的办法?理想情况下,会有一个 TakeAsync
方法。
Bonus:
To get around the problem in BlockingCollection
where an exception is thrown in Take()
when CompleteAdding()
is called, I'm using the TryTake
overload with a timeout. If I didn't use the timeout in TryTake
, it would defeat the purpose of using a BlockingCollection
since TryTake
won't block. Is there a better way? Ideally, there would be a TakeAsync
method.
推荐答案
按照要求,这里的code我结束了下去。
As requested, here's the code I ended up going with.
这项工作是建立在一个主从配置,每个主被处理为一个批次。每个工作单元以这种方式排队:
The work is set up in a master-detail configuration, and each master is processed as a batch. Each unit of work is queued up in this fashion:
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
硕士被缓冲一次一个保存为其他以外的进程的工作。每个主站的细节分派通过 masterTransform
TransformManyBlock
的工作。 A BatchedJoinBlock
还创建收集细节一批。
Masters are buffered one at a time to save work for other outside processes. The details for each master are dispatched for work via the masterTransform
TransformManyBlock
. A BatchedJoinBlock
is also created to collect the details in one batch.
实际的工作在 detailTransform
TransformBlock
,异步,150在时间内完成。 BoundedCapacity
设置为300,以确保太多的大师没有得到在链的开始缓冲,同时还留出空间不够详细记录进行排队,让150记录在同一时间进行处理。该块中的对象
输出到其目标,因为它是整个取决于它是否是一个详细信息
或<$链接过滤C $ C>例外。
The actual work is done in the detailTransform
TransformBlock
, asynchronously, 150 at a time. BoundedCapacity
is set to 300 to ensure that too many Masters don't get buffered at the beginning of the chain, while also leaving room for enough detail records to be queued to allow 150 records to be processed at one time. The block outputs an object
to its targets, because it's filtered across the links depending on whether it's a Detail
or Exception
.
的 batchAction
ActionBlock
收集来自所有批次的输出,并进行批量数据库更新,错误日志,等等。对于每个批次
The batchAction
ActionBlock
collects the output from all the batches, and performs bulk database updates, error logging, etc. for each batch.
将有几个 BatchedJoinBlock
S,为每个主。由于每个 ISourceBlock
是输出顺序和每批只接受与一种主相关联的详细记录的数目,批次将按顺序被处理。每块仅输出一组,并且是在完成解除链接。只有最后一批块传播其完成最终的 ActionBlock
。
There will be several BatchedJoinBlock
s, one for each master. Since each ISourceBlock
is output sequentially and each batch only accepts the number of detail records associated with one master, the batches will be processed in order. Each block only outputs one group, and is unlinked on completion. Only the last batch block propagates its completion to the final ActionBlock
.
数据流网络:
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
这篇关于节流异步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!