任务排序和重入 [英] Task sequencing and re-entracy

查看:20
本文介绍了任务排序和重入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下场景,我认为这可能很常见:

  1. 有一个任务(一个 UI 命令处理程序)可以同步或异步完成.

  2. 命令到达的速度可能比处理的更快.

  3. 如果一个命令已经有一个待处理的任务,新的命令处理程序任务应该被排队并顺序处理.

  4. 每个新任务的结果可能取决于前一个任务的结果.

应该遵守取消,但为了简单起见,我想将其排除在本问题的范围之外.此外,线程安全(并发)不是必需的,但必须支持重入.

这是我想要实现的基本示例(为简单起见,作为控制台应用程序):

使用系统;使用 System.Threading.Tasks;命名空间 ConsoleApp{课程计划{静态无效主(字符串 [] args){var asyncOp = new AsyncOp();功能>handleAsync = async (arg) =>{Console.WriteLine("此任务参数:" + arg);//await Task.Delay(arg);//使其异步返回等待 Task.FromResult(arg);//同步};Console.WriteLine("测试#1...");asyncOp.RunAsync(() => handleAsync(1000));asyncOp.RunAsync(() => handleAsync(900));asyncOp.RunAsync(() => handleAsync(800));asyncOp.CurrentTask.Wait();Console.WriteLine("
按任意键继续测试#2...");Console.ReadLine();asyncOp.RunAsync(() =>{asyncOp.RunAsync(() => handleAsync(200));返回句柄异步(100);});asyncOp.CurrentTask.Wait();Console.WriteLine("
按任意键退出...");Console.ReadLine();}//异步操作类 AsyncOp<T>{任务<T>_pending = Task.FromResult(default(T));公共任务<T>当前任务 { 获取 { 返回 _pending;} }公共任务<T>RunAsync(Func> handler){var 挂起 = _pending;Func<Task<T>包装器=异步()=>{//等待上一个任务var prevResult = 等待挂起;Console.WriteLine("
prev 任务结果:" + prevResult);//启动并等待处理程序返回等待处理程序();};_pending = 包装器();返回_pending;}}}}

输出:

<前>测试 #1...上一个任务结果:0此任务参数:1000上一个任务结果:1000此任务参数:900上一个任务结果:900此任务参数:800按任意键继续测试#2...上一个任务结果:800上一个任务结果:800此任务 arg:200此任务 arg:100按任何一个键退出...

它按照要求工作,直到在测试 #2 中引入重入:

asyncOp.RunAsync(() =>{asyncOp.RunAsync(() => handleAsync(200));返回句柄异步(100);});

想要的输出应该是100,200,而不是200,100,因为已经有100 的待处理外部任务.那显然是因为内部任务是同步执行的,打破了逻辑 var pending = _pending;/* ... */_pending = wrapper() 用于外部任务.

如何让它也适用于测试 #2?

一种解决方案是使用 Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext() 对每个任务强制执行异步.但是,我不想在可能在内部同步的命令处理程序.另外,我不想依赖于任何特定同步上下文的行为(即依赖于 Task.Factory.StartNew 应该在创建的任务之前返回实际开始了).

在实际项目中,我负责上面的 AsyncOp,但无法控制命令处理程序(即,handleAsync 中的任何内容).

解决方案

我几乎忘记了可以手动构建 Task 而无需启动或调度它.然后,Task.Factory.StartNew"与new Task(...).Start" 让我回到正轨.我认为这是 Task 构造函数实际上可能很有用,还有嵌套任务 (Task>) 和 Task.Unwrap():

//异步操作类 AsyncOp<T>{任务<T>_pending = Task.FromResult(default(T));公共任务<T>当前任务 { 获取 { 返回 _pending;} }公共任务<T>RunAsync(Func> handler, bool useSynchronizationContext = false){var 挂起 = _pending;Func<Task<T>包装器=异步()=>{//等待上一个任务var prevResult = 等待挂起;Console.WriteLine("
prev 任务结果:" + prevResult);//启动并等待处理程序返回等待处理程序();};var task = new Task>(wrapper);varinner = task.Unwrap();_pending = 内部;task.RunSynchronously(useSynchronizationContext ?TaskScheduler.FromCurrentSynchronizationContext() :TaskScheduler.Current);返回内部;}}

输出:

<前>测试 #1...上一个任务结果:0此任务参数:1000上一个任务结果:1000此任务参数:900上一个任务结果:900此任务参数:800按任意键继续测试#2...上一个任务结果:800此任务 arg:100上一个任务结果:100此任务 arg:200

如果需要,现在还可以通过添加 lock 来保护 _pending 来使 AsyncOp 成为线程安全的.

更新,通过取消/重启逻辑进一步改进了这一点.

I've got the following scenario, which I think might be quite common:

  1. There is a task (a UI command handler) which can complete either synchronously or asynchronously.

  2. Commands may arrive faster than they are getting processed.

  3. If there is already a pending task for a command, the new command handler task should be queued and processed sequentially.

  4. Each new task's result may depend on the result of the previous task.

Cancellation should be observed, but I'd like to leave it outside the scope of this question for simplicity. Also, thread-safety (concurrency) is not a requirement, but re-entrancy must be supported.

Here's a basic example of what I'm trying to achieve (as a console app, for simplicity):

using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp<int>();

            Func<int, Task<int>> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("
Press any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("
Press any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp<T>
        {
            Task<T> _pending = Task.FromResult(default(T));

            public Task<T> CurrentTask { get { return _pending; } }

            public Task<T> RunAsync(Func<Task<T>> handler)
            {
                var pending = _pending;
                Func<Task<T>> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("
prev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

The output:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800

prev task result:  800
this task arg: 200
this task arg: 100

Press any key to exit...

It works in accordance with the requirements, until re-entrancy is introduced in test #2:

asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

The desired output should be 100, 200, rather than 200, 100, because there's already a pending outer task for 100. That's obviously because the inner task executes synchronously, breaking the logic var pending = _pending; /* ... */ _pending = wrapper() for the outer task.

How to make it work for test #2, too?

One solution would be to enforce asynchrony for every task, with Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext(). However, I don't want to impose asynchronous execution upon the command handlers which might be synchronous internally. Also, I don't want to depend on the behavior of any particular synchronization context (i.e. relying upon that Task.Factory.StartNew should return before the created task has been actually started).

In the real-life project, I'm responsible for what AsyncOp is above, but have no control over the command handlers (i.e., whatever is inside handleAsync).

解决方案

I almost forgot it's possible to construct a Task manually, without starting or scheduling it. Then, "Task.Factory.StartNew" vs "new Task(...).Start" put me back on track. I think this is one of those few cases when the Task<TResult> constructor may actually be useful, along with nested tasks (Task<Task<T>>) and Task.Unwrap():

// AsyncOp
class AsyncOp<T>
{
    Task<T> _pending = Task.FromResult(default(T));

    public Task<T> CurrentTask { get { return _pending; } }

    public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
    {
        var pending = _pending;
        Func<Task<T>> wrapper = async () =>
        {
            // await the prev task
            var prevResult = await pending;
            Console.WriteLine("
prev task result:  " + prevResult);
            // start and await the handler
            return await handler();
        };

        var task = new Task<Task<T>>(wrapper);
        var inner = task.Unwrap();
        _pending = inner;

        task.RunSynchronously(useSynchronizationContext ?
            TaskScheduler.FromCurrentSynchronizationContext() :
            TaskScheduler.Current);

        return inner;
    }
}

The output:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800
this task arg: 100

prev task result:  100
this task arg: 200

It's now also very easy to make AsyncOp thread-safe by adding a lock to protect _pending, if needed.

Updated, this has been further improved with cancel/restart logic.

这篇关于任务排序和重入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆