发送并行请求,但每个主机只能通过HttpClient和Polly发送一个请求,以正常处理429个响应 [英] Send parallel requests but only one per host with HttpClient and Polly to gracefully handle 429 responses
问题描述
简介:
我正在构建一个单节点Web搜寻器,以简单地验证.NET Core控制台应用程序中的URL是否为 200 OK
.我在使用 HttpClient
向其发送请求的不同主机上有一组URL.我对使用Polly和TPL数据流还很陌生.
I am building a single-node web crawler to simply validate URLs are 200 OK
in a .NET Core console application. I have a collection of URLs at different hosts to which I am sending requests with HttpClient
. I am fairly new to using Polly and TPL Dataflow.
要求:
- 我想支持同时发送多个HTTP请求和一个可配置的
MaxDegreeOfParallelism
. - 我想将对任何给定主机的并行请求数限制为1(或可配置).这是为了通过Polly策略妥善处理每个主机的
429 TooManyRequests
响应.或者,我可以使用断路器来取消对同一主机的并发请求,只要收到一个429
响应,然后一次处理该特定主机? - 我完全不使用TPL Dataflow是完全可以的,而赞成使用节制并行请求的Polly Bulkhead或其他机制,但是我不确定为了实现要求2该配置是什么样子.
- I want to support sending multiple HTTP requests in parallel with a
configurable
MaxDegreeOfParallelism
. - I want to limit the number of parallel requests to any given host to 1 (or configurable). This is in order to gracefully handle per-host
429 TooManyRequests
responses with a Polly policy. Alternatively, I could maybe use a Circuit Breaker to cancel concurrent requests to the same host on receipt of one429
response and then proceed one-at-a-time to that specific host? - I am perfectly fine with not using TPL Dataflow at all in favor of maybe using a Polly Bulkhead or some other mechanism for throttled parallel requests, but I am not sure what that configuration would look like in order to implement requirement #2.
当前实施:
我当前的实现工作正常,除了我经常看到对同一主机的 x
并行请求大约在同一时间返回 429
.然后,它们都暂停执行重试策略...然后,它们都在同一时间再次猛击同一台主机,但通常仍会收到 429
s.即使我在整个队列中平均分配了同一主机的多个实例,我的URL集合也被一些特定的主机超载了,这些主机最终仍会开始生成 429
.
My current implementation works, except that I often see that I'll have x
parallel requests to the same host return 429
at about the same time... Then, they all pause for the retry policy... Then, they all slam the same host again at the same time often still receiving 429
s. Even if I distribute multiple instances of the same host evenly throughout the queue, my URL collection is overweighted with a few specific hosts that still start generating 429
s eventually.
收到 429
后,我想我只想向该主机发送一个并发请求,以尊重远程主机并追求 200
.
After receiving a 429
, I think I only want to send one concurrent request to that host going forward to respect the remote host and pursue 200
s.
Validator方法:
Validator Method:
public async Task<int> GetValidCount(IEnumerable<Uri> urls, CancellationToken cancellationToken)
{
var validator = new TransformBlock<Uri, bool>(
async u => (await _httpClient.GetAsync(u, HttpCompletionOption.ResponseHeadersRead, cancellationToken)).IsSuccessStatusCode,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = MaxDegreeOfParallelism}
);
foreach (var url in urls)
await validator.SendAsync(url, cancellationToken);
validator.Complete();
var validUrlCount = 0;
while (await validator.OutputAvailableAsync(cancellationToken))
{
if(await validator.ReceiveAsync(cancellationToken))
validUrlCount++;
}
await validator.Completion;
return validUrlCount;
}
应用于上面 GetValidCount()
中的HttpClient实例的Polly策略.
The Polly policy applied to the HttpClient instance used in GetValidCount()
above.
IAsyncPolicy<HttpResponseMessage> waitAndRetryTooManyRequests = Policy
.HandleResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(3,
(retryCount, response, context) =>
response.Result?.Headers.RetryAfter.Delta ?? TimeSpan.FromMilliseconds(120),
async (response, timespan, retryCount, context) =>
{
// log stuff
});
问题:
如何修改或替换此解决方案以增加对要求2的满足?
How can I modify or replace this solution to add satisfaction of requirement #2?
推荐答案
Here is a method that creates a TransformBlock
which prevents concurrent execution for messages with the same key. The key of each message is obtained by invoking the supplied keySelector
function. Messages with the same key are processed sequentially to each other (not in parallel). The key is also passed as an argument to the transform
function, because it can be useful in some cases.
public static TransformBlock<TInput, TOutput>
CreateExclusivePerKeyTransformBlock<TInput, TKey, TOutput>(
Func<TInput, TKey, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions,
Func<TInput, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;
var internalCTS = CancellationTokenSource
.CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);
var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
var taskScheduler = dataflowBlockOptions.TaskScheduler;
var maxDopSemaphore
= new SemaphoreSlim(maxDOP == -1 ? Int32.MaxValue : maxDOP);
var perKeySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>(
keyComparer);
// The degree of parallelism is controlled by the semaphores
dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;
// An exclusive scheduler is needed for preserving the processing order
dataflowBlockOptions.TaskScheduler =
new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
var block = new TransformBlock<TInput, TOutput>(async item =>
{
var key = keySelector(item);
var perKeySemaphore = perKeySemaphores
.GetOrAdd(key, _ => new SemaphoreSlim(1));
await perKeySemaphore.WaitAsync(internalCTS.Token).ConfigureAwait(false);
try
{
await maxDopSemaphore.WaitAsync(internalCTS.Token)
.ConfigureAwait(false);
try
{
// Invoke the transform using the provided TaskScheduler
return await Task.Factory.StartNew(() => transform(item, key),
internalCTS.Token, TaskCreationOptions.DenyChildAttach,
taskScheduler).Unwrap().ConfigureAwait(false);
}
catch (Exception ex) when (!(ex is OperationCanceledException))
{
internalCTS.Cancel(); // The block has failed
throw;
}
finally
{
maxDopSemaphore.Release();
}
}
finally
{
perKeySemaphore.Release();
}
}, dataflowBlockOptions);
_ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
TaskScheduler.Default);
dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value
return block;
}
用法示例:
var validator = CreateExclusivePerKeyTransformBlock<Uri, string, bool>(
async (uri, host) =>
{
return (await _httpClient.GetAsync(uri, HttpCompletionOption
.ResponseHeadersRead, token)).IsSuccessStatusCode;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 30,
CancellationToken = token,
},
keySelector: uri => uri.Host,
keyComparer: StringComparer.OrdinalIgnoreCase);
所有执行选项受支持( MaxDegreeOfParallelism
, BoundedCapacity
, CancellationToken
, EnsureOrdered
等).
All execution options are supported (MaxDegreeOfParallelism
, BoundedCapacity
, CancellationToken
, EnsureOrdered
etc).
以下是 CreateExclusivePerKeyTransformBlock
的重载,该重载接受同步委托,而另一个方法+重载返回
Below is an overload of the CreateExclusivePerKeyTransformBlock
that accepts a synchronous delegate, and another method+overload that returns an ActionBlock
instead of a TransformBlock
, with the same behavior.
public static TransformBlock<TInput, TOutput>
CreateExclusivePerKeyTransformBlock<TInput, TKey, TOutput>(
Func<TInput, TKey, TOutput> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions,
Func<TInput, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
return CreateExclusivePerKeyTransformBlock(
(item, key) => Task.FromResult(transform(item, key)),
dataflowBlockOptions, keySelector, keyComparer);
}
// An ITargetBlock is similar to an ActionBlock
public static ITargetBlock<TInput>
CreateExclusivePerKeyActionBlock<TInput, TKey>(
Func<TInput, TKey, Task> action,
ExecutionDataflowBlockOptions dataflowBlockOptions,
Func<TInput, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = null)
{
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateExclusivePerKeyTransformBlock(async (item, key) =>
{ await action(item, key).ConfigureAwait(false); return (object)null; },
dataflowBlockOptions, keySelector, keyComparer);
block.LinkTo(DataflowBlock.NullTarget<object>());
return block;
}
public static ITargetBlock<TInput>
CreateExclusivePerKeyActionBlock<TInput, TKey>(
Action<TInput, TKey> action,
ExecutionDataflowBlockOptions dataflowBlockOptions,
Func<TInput, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = null)
{
if (action == null) throw new ArgumentNullException(nameof(action));
return CreateExclusivePerKeyActionBlock(
(item, key) => { action(item, key); return Task.CompletedTask; },
dataflowBlockOptions, keySelector, keyComparer);
}
警告::此类为每个键分配一个 SemaphoreSlim
,并保留对其的引用,直到最终对类实例进行垃圾回收为止.万一不同密钥的数量很大,这可能是个问题.有一个实现较少分配的异步锁,在内部仅存储当前正在使用的 SemaphoreSlim
(以及一小部分已发布的可以重复使用的 SemaphoreSlim
),它可以代替此实现使用的 ConcurrentDictionary< TKey,SemaphoreSlim>
.
Caution: This class allocates one SemaphoreSlim
per key, and keeps a reference to it until the class instance is finally garbage collected. This could be an issue in case the number of different keys is huge. There is an implementation of a less allocatey async lock here, that stores internally only the SemaphoreSlim
s that are currently in use (plus a small pool of released SemaphoreSlim
s that can be reused), which could replace the ConcurrentDictionary<TKey, SemaphoreSlim>
used by this implementation.
这篇关于发送并行请求,但每个主机只能通过HttpClient和Polly发送一个请求,以正常处理429个响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!