任务并行库 - 自定义任务调度程序 [英] Task Parallel Library - Custom Task Schedulers
问题描述
我需要将 Web 服务请求发送到在线 api,我认为 Parallel Extensions 非常适合我的需求.
I have a requirement to fire off web service requests to an online api and I thought that Parallel Extensions would be a good fit for my needs.
有问题的网络服务旨在重复调用,但有一种机制,如果您每秒调用超过一定数量,则向您收费.我显然想尽量减少我的费用,所以想知道是否有人见过可以满足以下要求的 TaskScheduler:
The web service in question is designed to be called repeatedly, but has a mechanism that charges you if you got over a certain number of calls per second. I obviously want to minimize my charges and so was wondering if anyone has seen a TaskScheduler that can cope with the following requirements:
- 限制每个时间跨度安排的任务数量.我想如果请求的数量超过了这个限制,那么它需要扔掉任务还是可能会阻塞?(停止任务的积压)
- 检测相同的请求是否已经在要执行的调度程序中但尚未执行,如果是,则不将第二个任务排队,而是返回第一个.
人们是否认为这些是任务调度程序应该处理的职责,还是我在找错树?如果您有其他选择,我愿意接受建议.
Do people feel that these are the sorts of responsibilities a task scheduler should be dealing with or am i barking up the wrong tree? If you have alternatives I am open to suggestions.
推荐答案
我同意其他人的看法,即 TPL Dataflow 听起来是一个很好的解决方案.
I agree with others that TPL Dataflow sounds like a good solution for this.
为了限制处理,您可以创建一个 TransformBlock
,它实际上不会以任何方式转换数据,如果它在前一个数据之后过早到达,它只会延迟它:
To limit the processing, you could create a TransformBlock
that doesn't actually transform the data in any way, it just delays it if it arrived too soon after the previous data:
static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
DateTime lastItem = DateTime.MinValue;
return new TransformBlock<T, T>(
async x =>
{
var waitTime = lastItem + delay - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
await Task.Delay(waitTime);
lastItem = DateTime.UtcNow;
return x;
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
然后创建一个产生数据的方法(例如从 0 开始的整数):
Then create a method that produces the data (for example integers starting from 0):
static async Task Producer(ITargetBlock<int> target)
{
int i = 0;
while (await target.SendAsync(i))
i++;
}
它是异步写入的,因此如果目标块现在无法处理项目,它将等待.
It's written asynchronously, so that if the target block isn't able to process the items right now, it will wait.
然后写一个消费者方法:
Then write a consumer method:
static void Consumer(int i)
{
Console.WriteLine(i);
}
最后,将它们链接在一起并启动它:
And finally, link it all together and start it up:
var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));
var consumerBlock = new ActionBlock<int>(
(Action<int>)Consumer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);
这里,delayBlock
最多每 500 毫秒接受一项,并且 Consumer()
方法可以并行运行多次.要完成处理,请调用 delayBlock.Complete()
.
Here, delayBlock
will accept at most one item every 500 ms and the Consumer()
method can run multiple times in parallel. To finish processing, call delayBlock.Complete()
.
如果您想为 #2 添加一些缓存,您可以创建另一个 TransformBlock
在那里完成工作并将其链接到其他块.
If you want to add some caching per your #2, you could create another TransformBlock
do the work there and link it to the other blocks.
这篇关于任务并行库 - 自定义任务调度程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!