如何并行执行任务但每T秒不超过N个任务? [英] How to execute tasks in parallel but not more than N tasks per T seconds?

查看:25
本文介绍了如何并行执行任务但每T秒不超过N个任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要尽可能快地并行运行许多任务.但是如果我的程序每 1 秒运行超过 30 个任务,它就会被阻塞.如何确保每 1 秒间隔运行的任务不超过 30 个?

I need to run many tasks in parallel as fast as possible. But if my program runs more than 30 tasks per 1 second, it will be blocked. How to ensure that tasks run no more than 30 per any 1-second interval?

换句话说,如果在最后 1 秒的时间间隔内完成了 30 个任务,我们必须阻止新任务启动.

In other words, we must prevent the new task from starting if 30 tasks were completed in the last 1-second interval.

我丑陋的可能解决方案:

My ugly possible solution:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}

推荐答案

用户代码不应该直接控制任务的调度方式.一方面,它不能 - 控制任务的运行方式是 TaskScheduler.当用户代码调用 .Start() 时,它只是将一个任务添加到线程池队列中执行.await 执行已经执行的任务.

User code should never have to control how tasks are scheduled directly. For one thing, it can't - controlling how tasks run is the job of the TaskScheduler. When user code calls .Start(), it simply adds a task to a threadpool queue for execution. await executes already executing tasks.

TaskScheduler 示例展示了如何创建有限的并发调度程序,但同样,还有更好的高级选项.

The TaskScheduler samples show how to create limited concurrency schedulers, but again, there are better, high-level options.

无论如何,问题的代码不会限制排队的任务,它限制了可以等待的任务数量.他们都已经在运行了.这类似于在管道中批处理上一个异步操作,只允许有限数量的消息传递到下一个级别.

The question's code doesn't throttle the queued tasks anyway, it limits how many of them can be awaited. They are all running already. This is similar to batching the previous asynchronous operation in a pipeline, allowing only a limited number of messages to pass to the next level.

ActionBlock 延迟

简单、开箱即用的方法是使用具有有限 MaxDegreeOfParallelism 的 ActionBlock,以确保可以同时运行不超过 N 个并发操作.如果我们知道每个操作需要多长时间,我们可以添加一点延迟以确保我们不会超过油门限制.

The easy, out-of-the-box way would be to use an ActionBlock with a limited MaxDegreeOfParallelism, to ensure no more than N concurrent operations can run at the same time. If we know how long each operation takes, we could add a bit of delay to ensure we don't overshoot the throttle limit.

在这种情况下,7 个并发工作线程每秒执行 4 个请求,总共每秒执行 28 个最大请求.BoundedCapacity 意味着在 downloader.SendAsync 块之前,最多只能将 7 个项目存储在输入缓冲区中.这样我们就可以避免在操作时间过长的情况下淹没 ActionBlock.

In this case, 7 concurrent workers perform 4 requests/second, for a total of 28 maximum request per second. The BoundedCapacity means that only up to 7 items will be stored in the input buffer before downloader.SendAsync blocks. This way we avoid flooding the ActionBlock if the operations take too long.

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

带有 SemaphoreSlim 的 ActionBlock

另一种选择是将其与由计时器定期重置的 SemaphoreSlim 结合使用.

Another option would be to combine this with a SemaphoreSlim that gets reset periodically by a timer.

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}

这篇关于如何并行执行任务但每T秒不超过N个任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆