任务并行库 - 自定义任务调度程序 [英] Task Parallel Library - Custom Task Schedulers

查看:32
本文介绍了任务并行库 - 自定义任务调度程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将 Web 服务请求发送到在线 api,我认为 Parallel Extensions 非常适合我的需求.

I have a requirement to fire off web service requests to an online api and I thought that Parallel Extensions would be a good fit for my needs.

有问题的网络服务旨在重复调用,但有一种机制,如果您每秒调用超过一定数量,则向您收费.我显然想尽量减少我的费用,所以想知道是否有人见过可以满足以下要求的 TaskScheduler:

The web service in question is designed to be called repeatedly, but has a mechanism that charges you if you got over a certain number of calls per second. I obviously want to minimize my charges and so was wondering if anyone has seen a TaskScheduler that can cope with the following requirements:

  1. 限制每个时间跨度安排的任务数量.我想如果请求的数量超过了这个限制,那么它需要扔掉任务还是可能会阻塞?(停止任务的积压)
  2. 检测相同的请求是否已经在要执行的调度程序中但尚未执行,如果是,则不将第二个任务排队,而是返回第一个.

人们是否认为这些是任务调度程序应该处理的职责,还是我在找错树?如果您有其他选择,我愿意接受建议.

Do people feel that these are the sorts of responsibilities a task scheduler should be dealing with or am i barking up the wrong tree? If you have alternatives I am open to suggestions.

推荐答案

我同意其他人的看法,即 TPL Dataflow 听起来是一个很好的解决方案.

I agree with others that TPL Dataflow sounds like a good solution for this.

为了限制处理,您可以创建一个 TransformBlock,它实际上不会以任何方式转换数据,如果它在前一个数据之后过早到达,它只会延迟它:

To limit the processing, you could create a TransformBlock that doesn't actually transform the data in any way, it just delays it if it arrived too soon after the previous data:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

然后创建一个产生数据的方法(例如从 0 开始的整数):

Then create a method that produces the data (for example integers starting from 0):

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}

它是异步写入的,因此如果目标块现在无法处理项目,它将等待.

It's written asynchronously, so that if the target block isn't able to process the items right now, it will wait.

然后写一个消费者方法:

Then write a consumer method:

static void Consumer(int i)
{
    Console.WriteLine(i);
}

最后,将它们链接在一起并启动它:

And finally, link it all together and start it up:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);

这里,delayBlock 最多每 500 毫秒接受一项,并且 Consumer() 方法可以并行运行多次.要完成处理,请调用 delayBlock.Complete().

Here, delayBlock will accept at most one item every 500 ms and the Consumer() method can run multiple times in parallel. To finish processing, call delayBlock.Complete().

如果您想为 #2 添加一些缓存,您可以创建另一个 TransformBlock 在那里完成工作并将其链接到其他块.

If you want to add some caching per your #2, you could create another TransformBlock do the work there and link it to the other blocks.

这篇关于任务并行库 - 自定义任务调度程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆