为异步执行排队操作/委托 [英] Queuing Actions/Delegates for Asyncronous Execution

查看:35
本文介绍了为异步执行排队操作/委托的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

框架中有什么东西可以让我异步执行委托队列吗?

我的意思是我希望代表按照他们排队的顺序一次执行一个,但我希望整个过程异步运行.队列也不是固定的,额外的委托会定期添加,并应在到达队列顶部后立即处理.

我不需要特别使用 Queue,这只是我描述所需行为的方式.

我可以自己写一些东西来做,但如果有内置的东西我可以使用它会更好.

我简要地查看了 ThreadPool.QueueUserWorkItem,因为它允许按顺序执行,但可以找到一种令人满意的方法来防止一次执行多次.

解决方案

框架中有什么东西可以让我异步执行委托队列?

我会将其实现为自定义任务调度程序.然后,您可以将委托作为任务排队和运行,这将为您带来异常处理、取消和 async/await 的所有好处.

使用BlockingCollection 实现一个以串行顺序执行您的委托的任务调度程序非常简单.下面的 SerialTask​​SchedulerStephen Toub 的StaTaskScheduler:

使用系统;使用 System.Collections.Concurrent;使用 System.Collections.Generic;使用 System.Linq;使用 System.Threading;使用 System.Threading.Tasks;命名空间 Console_21628490{//测试课程计划{静态异步任务 DoWorkAsync(){使用 (var scheduler = new SerialTask​​Scheduler()){var tasks = Enumerable.Range(1, 10).Select(i =>scheduler.Run(() =>{无功睡眠= 1000/我;Thread.Sleep(sleep);Console.WriteLine("Task #" + i + ", sleep: " + sleep);}, CancellationToken.None));等待 Task.WhenAll(tasks);}}静态无效主(字符串 [] args){DoWorkAsync().Wait();Console.ReadLine();}}//串行任务调度器公共密封类 SerialTask​​Scheduler : TaskScheduler, IDisposable{任务_schedulerTask;BlockingCollection<Task>_任务;线程_schedulerThread;公共 SerialTask​​Scheduler(){_tasks = new BlockingCollection();_schedulerTask = Task.Run(() =>{_schedulerThread = Thread.CurrentThread;foreach(_tasks.GetConsumingEnumerable() 中的 var 任务)尝试执行任务(任务);});}受保护的覆盖无效队列任务(任务任务){_tasks.Add(task);}受保护的覆盖 IEnumerable获取计划任务(){返回 _tasks.ToArray();}protected override bool TryExecuteTaskInline(任务任务, bool taskWasPreviouslyQueued){返回 _schedulerThread == Thread.CurrentThread &&尝试执行任务(任务);}公共覆盖 int MaximumConcurrencyLevel{得到 { 返回 1;}}公共无效处置(){如果(_schedulerTask != null){_tasks.CompleteAdding();_schedulerTask.Wait();_tasks.Dispose();_tasks = 空;_schedulerTask = null;}}公共任务运行(操作动作,CancellationToken 令牌){返回 Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);}公共任务运行(Func action, CancellationToken token){返回 Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();}公共任务<T>Run(Func>动作,CancellationToken令牌){返回 Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();}}}

输出:

<前>任务 #1,睡眠:1000任务 #2,睡眠:500任务 #3,睡眠:333任务 #4,睡眠:250任务 #5,睡眠:200任务 #6,睡眠:166任务 #7,睡眠:142任务 #8,睡眠:125任务 #9,睡眠:111任务 #10,睡眠:100

Is there something in the framework that would allow me to asynchronously execute a queue of delegates?

What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.

I don't need to use a Queue in particular, it's just how I would describe the desired behavior.

I could write something myself to do it but if there is something built in I could use instead that would be better.

I briefly looked at ThreadPool.QueueUserWorkItem as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.

解决方案

Is there something in the framework that would allow me to asynchronously execute a queue of delegates?

I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await.

Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection. The SerialTaskScheduler below is a simplified version of Stephen Toub's StaTaskScheduler:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            using (var scheduler = new SerialTaskScheduler())
            {
                var tasks = Enumerable.Range(1, 10).Select(i =>
                    scheduler.Run(() =>
                    {
                        var sleep = 1000 / i;
                        Thread.Sleep(sleep);
                        Console.WriteLine("Task #" + i + ", sleep: " + sleep);
                    }, CancellationToken.None));

                await Task.WhenAll(tasks);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.ReadLine();
        }
    }

    // SerialTaskScheduler
    public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
    {
        Task _schedulerTask;
        BlockingCollection<Task> _tasks;
        Thread _schedulerThread;

        public SerialTaskScheduler()
        {
            _tasks = new BlockingCollection<Task>();

            _schedulerTask = Task.Run(() =>
            {
                _schedulerThread = Thread.CurrentThread;

                foreach (var task in _tasks.GetConsumingEnumerable())
                    TryExecuteTask(task);
            });
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _schedulerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_schedulerTask != null)
            {
                _tasks.CompleteAdding();
                _schedulerTask.Wait();
                _tasks.Dispose();
                _tasks = null;
                _schedulerTask = null;
            }
        }

        public Task Run(Action action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

Output:

Task #1, sleep: 1000
Task #2, sleep: 500
Task #3, sleep: 333
Task #4, sleep: 250
Task #5, sleep: 200
Task #6, sleep: 166
Task #7, sleep: 142
Task #8, sleep: 125
Task #9, sleep: 111
Task #10, sleep: 100

这篇关于为异步执行排队操作/委托的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆