如何并行执行任务,但每T秒不超过N个任务? [英] How to execute tasks in parallel but not more than N tasks per T seconds?

查看:83
本文介绍了如何并行执行任务,但每T秒不超过N个任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要尽可能快地并行运行许多任务.但是,如果我的程序每1秒运行30多个任务,它将被阻止.如何确保每1秒间隔运行任务不超过30个?

I need to run many tasks in parallel as fast as possible. But if my program runs more than 30 tasks per 1 second, it will be blocked. How to ensure that tasks run no more than 30 per any 1-second interval?

换句话说,如果在最近的1秒间隔内完成了30个任务,我们必须阻止新任务开始.

In other words, we must prevent the new task from starting if 30 tasks were completed in the last 1-second interval.

我的丑陋解决方案:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}

推荐答案

用户代码永远不必控制直接调度任务的方式.一方面,它不能-控制任务的运行方式是

User code should never have to control how tasks are scheduled directly. For one thing, it can't - controlling how tasks run is the job of the TaskScheduler. When user code calls .Start(), it simply adds a task to a threadpool queue for execution. await executes already executing tasks.

TaskScheduler示例展示了如何创建有限的并发调度程序,但是同样,还有更好的高级选项.

The TaskScheduler samples show how to create limited concurrency schedulers, but again, there are better, high-level options.

问题的代码无论如何都不会限制排队的任务,它限制了可以等待的任务数量.他们都已经在运行.这类似于在管道中批处理 previous 异步操作,仅允许有限数量的消息传递到下一级.

The question's code doesn't throttle the queued tasks anyway, it limits how many of them can be awaited. They are all running already. This is similar to batching the previous asynchronous operation in a pipeline, allowing only a limited number of messages to pass to the next level.

具有延迟的操作阻止

一种简便,即用的方法是使用具有受限MaxDegreeOfParallelism的ActionBlock,以确保最多可以同时运行N个并发操作.如果我们知道每个操作要花费多长时间,则可以增加一些延迟以确保我们不会超过油门极限.

The easy, out-of-the-box way would be to use an ActionBlock with a limited MaxDegreeOfParallelism, to ensure no more than N concurrent operations can run at the same time. If we know how long each operation takes, we could add a bit of delay to ensure we don't overshoot the throttle limit.

在这种情况下,7个并发工作器每秒执行4个请求,每秒总计28个最大请求. BoundedCapacity 意味着在 downloader.SendAsync 块之前,最多只有7个项目存储在输入缓冲区中.这样,如果操作花费的时间太长,我们可以避免淹没 ActionBlock .

In this case, 7 concurrent workers perform 4 requests/second, for a total of 28 maximum request per second. The BoundedCapacity means that only up to 7 items will be stored in the input buffer before downloader.SendAsync blocks. This way we avoid flooding the ActionBlock if the operations take too long.

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

带有SemaphoreSlim的ActionBlock

另一种选择是将其与 SemaphoreSlim 结合使用,后者由计时器定期重置.

Another option would be to combine this with a SemaphoreSlim that gets reset periodically by a timer.

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}

这篇关于如何并行执行任务,但每T秒不超过N个任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆