队列动作/代表的Asyncronous执行 [英] Queuing Actions/Delegates for Asyncronous Execution
问题描述
有什么在框架中,让我异步执行代表队列?
我的意思是我希望代表们在他们排队的顺序一次执行一个,但我想这整个过程异步运行。队列也没有固定,附加代表可能会周期性地添加的,应尽快到达队列的顶部被加工
我并不需要使用队列
特别,它只是我会如何描述所需的行为。
我可以写自己的东西做,但如果有一些建在我可以使用,而不是会更好。
我简单看了一下 ThreadPool.QueueUserWorkItem
,因为它允许执行的命令,但能找到一个满意的方式prevent多个执行的时间。
有什么在框架中,让我来 异步执行代表队列?
我想实现这个作为一个自定义任务调度。然后,您可以排队和运行代表的任务,这将让你的异常处理,取消所有的福利和异步/计谋
。
实现一个计划任务,将执行您的代表在串行顺序很简单,使用 BlockingCollection
。该 SerialTaskScheduler
下面是的斯蒂芬Toub的 StaTaskScheduler
:
使用系统;
使用System.Collections.Concurrent;
使用System.Collections.Generic;
使用System.Linq的;
使用的System.Threading;
使用System.Threading.Tasks;
命名空间Console_21628490
{
// 测试
类节目
{
静态异步任务DoWorkAsync()
{
使用(VAR调度=新SerialTaskScheduler())
{
变种任务= Enumerable.Range(1,10)。选择(ⅰ=>
scheduler.Run(()=>
{
VAR睡眠= 1000 / I;
Thread.sleep代码(睡眠);
Console.WriteLine(任务#+ I +,睡眠:+睡觉);
},CancellationToken.None));
等待Task.WhenAll(任务);
}
}
静态无效的主要(字串[] args)
{
DoWorkAsync()等待()。
到Console.ReadLine();
}
}
// SerialTaskScheduler
公共密封类SerialTaskScheduler:的TaskScheduler,IDisposable的
{
任务_schedulerTask;
BlockingCollection<任务> _tasks;
螺纹_schedulerThread;
公共SerialTaskScheduler()
{
_tasks =新BlockingCollection<任务>();
_schedulerTask = Task.Run(()=>
{
_schedulerThread = Thread.CurrentThread;
的foreach(在_tasks.GetConsumingEnumerable变种任务())
TryExecuteTask(任务);
});
}
保护覆盖无效QueueTask(任务的任务)
{
_tasks.Add(任务);
}
保护覆盖的IEnumerable<任务> GetScheduledTasks()
{
返回_tasks.ToArray();
}
保护覆盖布尔TryExecuteTaskInline(
任务的任务,布尔taskWas previouslyQueued)
{
返回_schedulerThread == Thread.CurrentThread和放大器;&安培;
TryExecuteTask(任务);
}
公众覆盖INT MaximumConcurrencyLevel
{
{返回1; }
}
公共无效的Dispose()
{
如果(_schedulerTask!= NULL)
{
_tasks.CompleteAdding();
_schedulerTask.Wait();
_tasks.Dispose();
_tasks = NULL;
_schedulerTask = NULL;
}
}
公共任务运行(操作动作,的CancellationToken令牌)
{
返回Task.Factory.StartNew(动作,令牌,TaskCreationOptions.None,这一点);
}
公共任务运行(Func键<任务>行动的CancellationToken令牌)
{
返回Task.Factory.StartNew(动作,令牌,TaskCreationOptions.None,这一点).Unwrap();
}
公共任务< T>润LT; T>(Func键<任务< T>>行动的CancellationToken令牌)
{
返回Task.Factory.StartNew(动作,令牌,TaskCreationOptions.None,这一点).Unwrap();
}
}
}
输出:
任务#1,睡眠:1000 任务#2,睡眠:500 任务#3,睡眠:333 任务#4,睡眠:250 任务#5,睡眠:200 任务#6,睡眠:166 任务#7,睡眠:142 任务#8,睡眠:125 任务#9,睡眠:111 任务#10,睡眠:100
Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.
I don't need to use a Queue
in particular, it's just how I would describe the desired behavior.
I could write something myself to do it but if there is something built in I could use instead that would be better.
I briefly looked at ThreadPool.QueueUserWorkItem
as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.
Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await
.
Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection
. The SerialTaskScheduler
below is a simplified version of Stephen Toub's StaTaskScheduler
:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21628490
{
// Test
class Program
{
static async Task DoWorkAsync()
{
using (var scheduler = new SerialTaskScheduler())
{
var tasks = Enumerable.Range(1, 10).Select(i =>
scheduler.Run(() =>
{
var sleep = 1000 / i;
Thread.Sleep(sleep);
Console.WriteLine("Task #" + i + ", sleep: " + sleep);
}, CancellationToken.None));
await Task.WhenAll(tasks);
}
}
static void Main(string[] args)
{
DoWorkAsync().Wait();
Console.ReadLine();
}
}
// SerialTaskScheduler
public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
{
Task _schedulerTask;
BlockingCollection<Task> _tasks;
Thread _schedulerThread;
public SerialTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_schedulerTask = Task.Run(() =>
{
_schedulerThread = Thread.CurrentThread;
foreach (var task in _tasks.GetConsumingEnumerable())
TryExecuteTask(task);
});
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return _schedulerThread == Thread.CurrentThread &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
public void Dispose()
{
if (_schedulerTask != null)
{
_tasks.CompleteAdding();
_schedulerTask.Wait();
_tasks.Dispose();
_tasks = null;
_schedulerTask = null;
}
}
public Task Run(Action action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task Run(Func<Task> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
}
}
Output:
Task #1, sleep: 1000 Task #2, sleep: 500 Task #3, sleep: 333 Task #4, sleep: 250 Task #5, sleep: 200 Task #6, sleep: 166 Task #7, sleep: 142 Task #8, sleep: 125 Task #9, sleep: 111 Task #10, sleep: 100
这篇关于队列动作/代表的Asyncronous执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!