带节流的异步任务队列,支持多线程 [英] Queue of async tasks with throttling which supports muti-threading
问题描述
我需要实现一个库来请求 vk.com API.问题是 API 每秒仅支持 3 个请求.我想要 API 异步.
I need to implement a library to request vk.com API. The problem is that API supports only 3 requests per second. I would like to have API asynchronous.
重要提示:API 应支持多线程安全访问.
Important: API should support safe accessing from multiple threads.
我的想法是实现一些称为节流器的类,它允许不超过 3 个请求/秒并延迟其他请求.
My idea is implement some class called throttler which allow no more than 3 request/second and delay other request.
界面如下:
public interface IThrottler : IDisposable
{
Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}
用法就像
var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());
如何实现节流器?
目前我将其实现为由后台线程处理的队列.
Currently I implemented it as queue which is processed by background thread.
public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
/// TaskRequest has method Run() to run task
/// TaskRequest uses TaskCompletionSource to provide new task
/// which is resolved when queue processed til this element.
var request = new TaskRequest<TResult>(task);
requestQueue.Enqueue(request);
return request.ResultTask;
}
这是处理队列的后台线程循环的缩短代码:
This is shorten code of background thread loop which process the queue:
private void ProcessQueue(object state)
{
while (true)
{
IRequest request;
while (requestQueue.TryDequeue(out request))
{
/// Delay method calculates actual delay value and calls Thread.Sleep()
Delay();
request.Run();
}
}
}
是否可以在没有后台线程的情况下实现?
推荐答案
所以我们将从一个更简单的问题的解决方案开始,即创建一个最多同时处理 N 个任务的队列,而不是限制到 N 个任务每秒启动,并以此为基础:
So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:
public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public TaskQueue(int concurrentRequests)
{
semaphore = new SemaphoreSlim(concurrentRequests);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
我们还将使用以下辅助方法将 TaskCompletionSource
的结果与 `Task:
We'll also use the following helper methods to match the result of a TaskCompletionSource
to a `Task:
public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
task.ContinueWith(t =>
{
switch (t.Status)
{
case TaskStatus.Canceled:
tcs.SetCanceled();
break;
case TaskStatus.Faulted:
tcs.SetException(t.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
tcs.SetResult(t.Result);
break;
}
});
}
public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
Match(tcs, task.ContinueWith(t => default(T)));
}
现在对于我们的实际解决方案,我们可以做的是每次需要执行节流操作时,我们创建一个 TaskCompletionSource
,然后进入我们的 TaskQueue
并添加一个item 启动任务,将 TCS 与其结果匹配,不等待,然后将任务队列延迟 1 秒.然后任务队列将不允许任务启动,直到过去一秒内不再有 N 个任务启动,而操作本身的结果与 create Task
相同:
Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource
, and then go into our TaskQueue
and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task
:
public class Throttler
{
private TaskQueue queue;
public Throttler(int requestsPerSecond)
{
queue = new TaskQueue(requestsPerSecond);
}
public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
var unused = queue.Enqueue(() =>
{
tcs.Match(taskGenerator());
return Task.Delay(TimeSpan.FromSeconds(1));
});
return tcs.Task;
}
public Task Enqueue<T>(Func<Task> taskGenerator)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
var unused = queue.Enqueue(() =>
{
tcs.Match(taskGenerator());
return Task.Delay(TimeSpan.FromSeconds(1));
});
return tcs.Task;
}
}
这篇关于带节流的异步任务队列,支持多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!