分区:如何在每个分区后添加等待 [英] Partition: How to add a wait after every partition
问题描述
我有一个每分钟接受 20 个请求的 API,之后,我需要等待 1 分钟才能查询它.我有一个项目列表(通常超过 1000 个),我需要从 API 查询其详细信息,我的想法是我可以使用 Partitioner
将我的列表分成 20 个项目/请求,但很快我意识到 Partitioner
不是这样工作的,我的第二个想法是在分区中添加一个 delay
但这也是一个坏主意,据我了解它在每个请求后都会增加一个延迟不需要,相反,我需要在每个 Partition
之后延迟.下面是我的代码:
I have an API that has accepts 20 requests per minute, after that, I need to wait for 1 minute before querying it. I have a list of items (usually 1000+) whose details I need to query from the API, my thought was I could use Partitioner
to partition my list into 20 items/requests but soon I realized the Partitioner
does not work like that, my 2nd thought was adding a delay
in the partition but that too is a bad idea, from my understanding it adds a delay after every request which is not needed, instead, I need a delay after every Partition
. Below is my code:
public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
{
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate {
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
{
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
}
return allResponses;
}, token));
return whenAll.SelectMany(x => x);
}
有谁知道我如何才能做到这一点?
Does anyone know how I can accomplish this?
推荐答案
这是一个 RateLimiter
类,您可以使用它来限制异步操作的频率.它是 RateLimiter
类的更简单实现,位于 这个 答案.
Here is a RateLimiter
class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter
class that is found in this answer.
/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
await Task.Delay(_timeUnit).ConfigureAwait(false);
_semaphore.Release();
}
}
使用示例:
List<string> urls = GetUrls();
var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
这篇关于分区:如何在每个分区后添加等待的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!