分区:如何在每个分区后添加等待 [英] Partition: How to add a wait after every partition

查看:54
本文介绍了分区:如何在每个分区后添加等待的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个每分钟接受 20 个请求的 API,之后,我需要等待 1 分钟才能查询它.我有一个项目列表(通常超过 1000 个),我需要从 API 查询其详细信息,我的想法是我可以使用 Partitioner 将我的列表分成 20 个项目/请求,但很快我意识到 Partitioner 不是这样工作的,我的第二个想法是在分区中添加一个 delay 但这也是一个坏主意,据我了解它在每个请求后都会增加一个延迟不需要,相反,我需要在每个 Partition 之后延迟.下面是我的代码:

I have an API that has accepts 20 requests per minute, after that, I need to wait for 1 minute before querying it. I have a list of items (usually 1000+) whose details I need to query from the API, my thought was I could use Partitioner to partition my list into 20 items/requests but soon I realized the Partitioner does not work like that, my 2nd thought was adding a delay in the partition but that too is a bad idea, from my understanding it adds a delay after every request which is not needed, instead, I need a delay after every Partition. Below is my code:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

有谁知道我如何才能做到这一点?

Does anyone know how I can accomplish this?

推荐答案

这是一个 RateLimiter 类,您可以使用它来限制异步操作​​的频率.它是 RateLimiter 类的更简单实现,位于 这个 答案.

Here is a RateLimiter class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter class that is found in this answer.

/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        await Task.Delay(_timeUnit).ConfigureAwait(false);
        _semaphore.Release();
    }
}

使用示例:

List<string> urls = GetUrls();

var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

这篇关于分区:如何在每个分区后添加等待的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆