使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应 [英] Send parallel requests but only one per host with HttpClient and Polly to gracefully handle 429 responses

查看:32
本文介绍了使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

简介:

我正在构建一个单节点网络爬虫来简单地验证 .NET Core 控制台应用程序中的 URL 是否200 OK.我在不同的主机上有一组 URL,我使用 HttpClient 向这些主机发送请求.我刚开始使用 Polly 和 TPL Dataflow.

I am building a single-node web crawler to simply validate URLs are 200 OK in a .NET Core console application. I have a collection of URLs at different hosts to which I am sending requests with HttpClient. I am fairly new to using Polly and TPL Dataflow.

要求:

  1. 我想支持与一个并行发送多个 HTTP 请求可配置的MaxDegreeOfParallelism.
  2. 我想将对任何给定主机的并行请求数量限制为 1(或可配置).这是为了使用 Polly 策略优雅地处理每个主机的 429 TooManyRequests 响应.或者,我可以使用断路器在收到一个 429 响应时取消对同一主机的并发请求,然后一次一个地继续到该特定主机?
  3. 我完全没有使用 TPL 数据流,而可能使用 Polly Bulkhead 或其他一些机制来限制并行请求,但我不确定为了实现要求 2 的配置会是什么样子.
  1. I want to support sending multiple HTTP requests in parallel with a configurable MaxDegreeOfParallelism.
  2. I want to limit the number of parallel requests to any given host to 1 (or configurable). This is in order to gracefully handle per-host 429 TooManyRequests responses with a Polly policy. Alternatively, I could maybe use a Circuit Breaker to cancel concurrent requests to the same host on receipt of one 429 response and then proceed one-at-a-time to that specific host?
  3. I am perfectly fine with not using TPL Dataflow at all in favor of maybe using a Polly Bulkhead or some other mechanism for throttled parallel requests, but I am not sure what that configuration would look like in order to implement requirement #2.

当前实施:

我当前的实现是有效的,除了我经常看到我会有 x 个并发请求到同一主机返回 429 大约在同一时间......然后,他们都暂停重试策略......然后,他们都在同一时间再次猛击同一台主机,通常仍然收到429s.即使我在整个队列中均匀分布同一主机的多个实例,我的 URL 集合也会被一些特定主机超重,这些主机最终仍会开始生成 429.

My current implementation works, except that I often see that I'll have x parallel requests to the same host return 429 at about the same time... Then, they all pause for the retry policy... Then, they all slam the same host again at the same time often still receiving 429s. Even if I distribute multiple instances of the same host evenly throughout the queue, my URL collection is overweighted with a few specific hosts that still start generating 429s eventually.

收到429后,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求200s.

After receiving a 429, I think I only want to send one concurrent request to that host going forward to respect the remote host and pursue 200s.

验证器方法:

public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
    var validator = new TransformBlock<Uri, bool>(
        async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode,
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}
    );
    foreach (var url in urls)
        await validator.SendAsync(url, cancellationToken);
    validator.Complete();
    var validUrlCount = 0;
    while (await validator.OutputAvailableAsync(cancellationToken))
    {
        if(await validator.ReceiveAsync(cancellationToken))
            validUrlCount++;
    }
    await validator.Completion;
    return validUrlCount;
}

应用于上述 GetValidCount() 中使用的 HttpClient 实例的 Polly 策略.

The Polly policy applied to the HttpClient instance used in GetValidCount() above.

IAsyncPolicy<HttpResponseMessage> waitAndRetryTooManyRequests = Policy
    .HandleResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.TooManyRequests)
    .WaitAndRetryAsync(3,
        (retryCount, response, context) =>
            response.Result?.Headers.RetryAfter.Delta ?? TimeSpan.FromMilliseconds(120),
        async (response, timespan, retryCount, context) =>
        {
            // log stuff
        });

问题:

如何修改或替换此解决方案以增加对需求 #2 的满意度?

How can I modify or replace this solution to add satisfaction of requirement #2?

推荐答案

这里是一个创建TransformBlock 防止具有相同密钥的消息并发执行.每条消息的密钥是通过调用提供的 keySelector 函数获得的.具有相同密钥的消息会被依次处理(不是并行处理).键也作为参数传递给 transform 函数,因为它在某些情况下很有用.

Here is a method that creates a TransformBlock which prevents concurrent execution for messages with the same key. The key of each message is obtained by invoking the supplied keySelector function. Messages with the same key are processed sequentially to each other (not in parallel). The key is also passed as an argument to the transform function, because it can be useful in some cases.

public static TransformBlock<TInput, TOutput>
    CreateExclusivePerKeyTransformBlock<TInput, TKey, TOutput>(
    Func<TInput, TKey, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions,
    Func<TInput, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;

    var internalCTS = CancellationTokenSource
        .CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var maxDopSemaphore
        = new SemaphoreSlim(maxDOP == -1 ? Int32.MaxValue : maxDOP);

    var perKeySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>(
        keyComparer);

    // The degree of parallelism is controlled by the semaphores
    dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

    // An exclusive scheduler is needed for preserving the processing order
    dataflowBlockOptions.TaskScheduler =
        new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;

    var block = new TransformBlock<TInput, TOutput>(async item =>
    {
        var key = keySelector(item);
        var perKeySemaphore = perKeySemaphores
            .GetOrAdd(key, _ => new SemaphoreSlim(1));
        await perKeySemaphore.WaitAsync(internalCTS.Token).ConfigureAwait(false);
        try
        {
            await maxDopSemaphore.WaitAsync(internalCTS.Token)
                .ConfigureAwait(false);
            try
            {
                // Invoke the transform using the provided TaskScheduler
                return await Task.Factory.StartNew(() => transform(item, key),
                    internalCTS.Token, TaskCreationOptions.DenyChildAttach,
                    taskScheduler).Unwrap().ConfigureAwait(false);
            }
            catch (Exception ex) when (!(ex is OperationCanceledException))
            {
                internalCTS.Cancel(); // The block has failed
                throw;
            }
            finally
            {
                maxDopSemaphore.Release();
            }
        }
        finally
        {
            perKeySemaphore.Release();
        }
    }, dataflowBlockOptions);

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
        TaskScheduler.Default);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value
    return block;
}

使用示例:

var validator = CreateExclusivePerKeyTransformBlock<Uri, string, bool>(
    async (uri, host) =>
    {
        return (await _httpClient.GetAsync(uri, HttpCompletionOption
            .ResponseHeadersRead, token)).IsSuccessStatusCode;
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 30,
        CancellationToken = token,
    },
    keySelector: uri => uri.Host,
    keyComparer: StringComparer.OrdinalIgnoreCase);

所有执行选项支持(MaxDegreeOfParallelismBoundedCapacityCancellationTokenEnsureOrdered 等).

All execution options are supported (MaxDegreeOfParallelism, BoundedCapacity, CancellationToken, EnsureOrdered etc).

下面是接受同步委托的 CreateExclusivePerKeyTransformBlock 的重载,以及另一个返回 ActionBlock 而不是 TransformBlock,与行为.

Below is an overload of the CreateExclusivePerKeyTransformBlock that accepts a synchronous delegate, and another method+overload that returns an ActionBlock instead of a TransformBlock, with the same behavior.

public static TransformBlock<TInput, TOutput>
    CreateExclusivePerKeyTransformBlock<TInput, TKey, TOutput>(
    Func<TInput, TKey, TOutput> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions,
    Func<TInput, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    return CreateExclusivePerKeyTransformBlock(
        (item, key) => Task.FromResult(transform(item, key)),
        dataflowBlockOptions, keySelector, keyComparer);
}

// An ITargetBlock is similar to an ActionBlock
public static ITargetBlock<TInput>
    CreateExclusivePerKeyActionBlock<TInput, TKey>(
    Func<TInput, TKey, Task> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions,
    Func<TInput, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateExclusivePerKeyTransformBlock(async (item, key) =>
        { await action(item, key).ConfigureAwait(false); return (object)null; },
        dataflowBlockOptions, keySelector, keyComparer);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    return block;
}

public static ITargetBlock<TInput>
    CreateExclusivePerKeyActionBlock<TInput, TKey>(
    Action<TInput, TKey> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions,
    Func<TInput, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    return CreateExclusivePerKeyActionBlock(
        (item, key) => { action(item, key); return Task.CompletedTask; },
        dataflowBlockOptions, keySelector, keyComparer);
}


注意:该类为每个键分配一个 SemaphoreSlim,并保留对它的引用,直到类实例最终被垃圾回收.如果不同密钥的数量很大,这可能是一个问题.有一个较少分配的异步锁的实现 这里,它只在内部存储当前使用的 SemaphoreSlims(加上一小部分可以重用的已发布的 SemaphoreSlims),它可以替换此实现使用的 ConcurrentDictionary.


Caution: This class allocates one SemaphoreSlim per key, and keeps a reference to it until the class instance is finally garbage collected. This could be an issue in case the number of different keys is huge. There is an implementation of a less allocatey async lock here, that stores internally only the SemaphoreSlims that are currently in use (plus a small pool of released SemaphoreSlims that can be reused), which could replace the ConcurrentDictionary<TKey, SemaphoreSlim> used by this implementation.

这篇关于使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆