为异步执行排队操作/委托 [英] Queuing Actions/Delegates for Asyncronous Execution
问题描述
框架中有什么东西可以让我异步执行委托队列吗?
我的意思是我希望代表按照他们排队的顺序一次执行一个,但我希望整个过程异步运行.队列也不是固定的,额外的委托会定期添加,并应在到达队列顶部后立即处理.
我不需要特别使用 Queue
,这只是我描述所需行为的方式.
我可以自己写一些东西来做,但如果有内置的东西我可以使用它会更好.
我简要地查看了 ThreadPool.QueueUserWorkItem
,因为它允许按顺序执行,但可以找到一种令人满意的方法来防止一次执行多次.
框架中有什么东西可以让我异步执行委托队列?
我会将其实现为自定义任务调度程序.然后,您可以将委托作为任务排队和运行,这将为您带来异常处理、取消和 async/await
的所有好处.
使用BlockingCollection
实现一个以串行顺序执行您的委托的任务调度程序非常简单.下面的 SerialTaskScheduler
是 Stephen Toub 的StaTaskScheduler
:
使用系统;使用 System.Collections.Concurrent;使用 System.Collections.Generic;使用 System.Linq;使用 System.Threading;使用 System.Threading.Tasks;命名空间 Console_21628490{//测试课程计划{静态异步任务 DoWorkAsync(){使用 (var scheduler = new SerialTaskScheduler()){var tasks = Enumerable.Range(1, 10).Select(i =>scheduler.Run(() =>{无功睡眠= 1000/我;Thread.Sleep(sleep);Console.WriteLine("Task #" + i + ", sleep: " + sleep);}, CancellationToken.None));等待 Task.WhenAll(tasks);}}静态无效主(字符串 [] args){DoWorkAsync().Wait();Console.ReadLine();}}//串行任务调度器公共密封类 SerialTaskScheduler : TaskScheduler, IDisposable{任务_schedulerTask;BlockingCollection<Task>_任务;线程_schedulerThread;公共 SerialTaskScheduler(){_tasks = new BlockingCollection();_schedulerTask = Task.Run(() =>{_schedulerThread = Thread.CurrentThread;foreach(_tasks.GetConsumingEnumerable() 中的 var 任务)尝试执行任务(任务);});}受保护的覆盖无效队列任务(任务任务){_tasks.Add(task);}受保护的覆盖 IEnumerable获取计划任务(){返回 _tasks.ToArray();}protected override bool TryExecuteTaskInline(任务任务, bool taskWasPreviouslyQueued){返回 _schedulerThread == Thread.CurrentThread &&尝试执行任务(任务);}公共覆盖 int MaximumConcurrencyLevel{得到 { 返回 1;}}公共无效处置(){如果(_schedulerTask != null){_tasks.CompleteAdding();_schedulerTask.Wait();_tasks.Dispose();_tasks = 空;_schedulerTask = null;}}公共任务运行(操作动作,CancellationToken 令牌){返回 Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);}公共任务运行(Func action, CancellationToken token){返回 Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();}公共任务<T>Run(Func>动作,CancellationToken令牌){返回 Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();}}}
输出:
<前>任务 #1,睡眠:1000任务 #2,睡眠:500任务 #3,睡眠:333任务 #4,睡眠:250任务 #5,睡眠:200任务 #6,睡眠:166任务 #7,睡眠:142任务 #8,睡眠:125任务 #9,睡眠:111任务 #10,睡眠:100Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.
I don't need to use a Queue
in particular, it's just how I would describe the desired behavior.
I could write something myself to do it but if there is something built in I could use instead that would be better.
I briefly looked at ThreadPool.QueueUserWorkItem
as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.
Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await
.
Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection
. The SerialTaskScheduler
below is a simplified version of Stephen Toub's StaTaskScheduler
:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21628490
{
// Test
class Program
{
static async Task DoWorkAsync()
{
using (var scheduler = new SerialTaskScheduler())
{
var tasks = Enumerable.Range(1, 10).Select(i =>
scheduler.Run(() =>
{
var sleep = 1000 / i;
Thread.Sleep(sleep);
Console.WriteLine("Task #" + i + ", sleep: " + sleep);
}, CancellationToken.None));
await Task.WhenAll(tasks);
}
}
static void Main(string[] args)
{
DoWorkAsync().Wait();
Console.ReadLine();
}
}
// SerialTaskScheduler
public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
{
Task _schedulerTask;
BlockingCollection<Task> _tasks;
Thread _schedulerThread;
public SerialTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_schedulerTask = Task.Run(() =>
{
_schedulerThread = Thread.CurrentThread;
foreach (var task in _tasks.GetConsumingEnumerable())
TryExecuteTask(task);
});
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return _schedulerThread == Thread.CurrentThread &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
public void Dispose()
{
if (_schedulerTask != null)
{
_tasks.CompleteAdding();
_schedulerTask.Wait();
_tasks.Dispose();
_tasks = null;
_schedulerTask = null;
}
}
public Task Run(Action action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task Run(Func<Task> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
}
}
Output:
Task #1, sleep: 1000 Task #2, sleep: 500 Task #3, sleep: 333 Task #4, sleep: 250 Task #5, sleep: 200 Task #6, sleep: 166 Task #7, sleep: 142 Task #8, sleep: 125 Task #9, sleep: 111 Task #10, sleep: 100
这篇关于为异步执行排队操作/委托的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!