任务排序和重入 [英] Task sequencing and re-entracy
问题描述
我有以下场景,我认为这可能很常见:
有一个任务(一个 UI 命令处理程序)可以同步或异步完成.
命令到达的速度可能比处理的更快.
如果一个命令已经有一个待处理的任务,新的命令处理程序任务应该被排队并顺序处理.
每个新任务的结果可能取决于前一个任务的结果.
应该遵守取消,但为了简单起见,我想将其排除在本问题的范围之外.此外,线程安全(并发)不是必需的,但必须支持重入.
这是我想要实现的基本示例(为简单起见,作为控制台应用程序):
使用系统;使用 System.Threading.Tasks;命名空间 ConsoleApp{课程计划{静态无效主(字符串 [] args){var asyncOp = new AsyncOp();功能>handleAsync = async (arg) =>{Console.WriteLine("此任务参数:" + arg);//await Task.Delay(arg);//使其异步返回等待 Task.FromResult(arg);//同步};Console.WriteLine("测试#1...");asyncOp.RunAsync(() => handleAsync(1000));asyncOp.RunAsync(() => handleAsync(900));asyncOp.RunAsync(() => handleAsync(800));asyncOp.CurrentTask.Wait();Console.WriteLine("
按任意键继续测试#2...");Console.ReadLine();asyncOp.RunAsync(() =>{asyncOp.RunAsync(() => handleAsync(200));返回句柄异步(100);});asyncOp.CurrentTask.Wait();Console.WriteLine("
按任意键退出...");Console.ReadLine();}//异步操作类 AsyncOp<T>{任务<T>_pending = Task.FromResult(default(T));公共任务<T>当前任务 { 获取 { 返回 _pending;} }公共任务<T>RunAsync(Func> handler){var 挂起 = _pending;Func<Task<T>包装器=异步()=>{//等待上一个任务var prevResult = 等待挂起;Console.WriteLine("
prev 任务结果:" + prevResult);//启动并等待处理程序返回等待处理程序();};_pending = 包装器();返回_pending;}}}}
输出:
<前>测试 #1...上一个任务结果:0此任务参数:1000上一个任务结果:1000此任务参数:900上一个任务结果:900此任务参数:800按任意键继续测试#2...上一个任务结果:800上一个任务结果:800此任务 arg:200此任务 arg:100按任何一个键退出...它按照要求工作,直到在测试 #2 中引入重入:
asyncOp.RunAsync(() =>{asyncOp.RunAsync(() => handleAsync(200));返回句柄异步(100);});
想要的输出应该是100
,200
,而不是200
,100
,因为已经有100
的待处理外部任务.那显然是因为内部任务是同步执行的,打破了逻辑 var pending = _pending;/* ... */_pending = wrapper()
用于外部任务.
如何让它也适用于测试 #2?
一种解决方案是使用 Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
对每个任务强制执行异步.但是,我不想在可能在内部同步的命令处理程序.另外,我不想依赖于任何特定同步上下文的行为(即依赖于 Task.Factory.StartNew
应该在创建的任务之前返回实际开始了).
在实际项目中,我负责上面的 AsyncOp
,但无法控制命令处理程序(即,handleAsync
中的任何内容).
我几乎忘记了可以手动构建 Task
而无需启动或调度它.然后,Task.Factory.StartNew"与new Task(...).Start" 让我回到正轨.我认为这是 Task
构造函数实际上可能很有用,还有嵌套任务 (Task
) 和 Task.Unwrap()
:
//异步操作类 AsyncOp<T>{任务<T>_pending = Task.FromResult(default(T));公共任务<T>当前任务 { 获取 { 返回 _pending;} }公共任务<T>RunAsync(Func> handler, bool useSynchronizationContext = false){var 挂起 = _pending;Func<Task<T>包装器=异步()=>{//等待上一个任务var prevResult = 等待挂起;Console.WriteLine("
prev 任务结果:" + prevResult);//启动并等待处理程序返回等待处理程序();};var task = new Task>(wrapper);varinner = task.Unwrap();_pending = 内部;task.RunSynchronously(useSynchronizationContext ?TaskScheduler.FromCurrentSynchronizationContext() :TaskScheduler.Current);返回内部;}}
输出:
<前>测试 #1...上一个任务结果:0此任务参数:1000上一个任务结果:1000此任务参数:900上一个任务结果:900此任务参数:800按任意键继续测试#2...上一个任务结果:800此任务 arg:100上一个任务结果:100此任务 arg:200如果需要,现在还可以通过添加 lock
来保护 _pending
来使 AsyncOp
成为线程安全的.
更新,通过取消/重启逻辑进一步改进了这一点.
I've got the following scenario, which I think might be quite common:
There is a task (a UI command handler) which can complete either synchronously or asynchronously.
Commands may arrive faster than they are getting processed.
If there is already a pending task for a command, the new command handler task should be queued and processed sequentially.
Each new task's result may depend on the result of the previous task.
Cancellation should be observed, but I'd like to leave it outside the scope of this question for simplicity. Also, thread-safety (concurrency) is not a requirement, but re-entrancy must be supported.
Here's a basic example of what I'm trying to achieve (as a console app, for simplicity):
using System;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var asyncOp = new AsyncOp<int>();
Func<int, Task<int>> handleAsync = async (arg) =>
{
Console.WriteLine("this task arg: " + arg);
//await Task.Delay(arg); // make it async
return await Task.FromResult(arg); // sync
};
Console.WriteLine("Test #1...");
asyncOp.RunAsync(() => handleAsync(1000));
asyncOp.RunAsync(() => handleAsync(900));
asyncOp.RunAsync(() => handleAsync(800));
asyncOp.CurrentTask.Wait();
Console.WriteLine("
Press any key to continue to test #2...");
Console.ReadLine();
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
asyncOp.CurrentTask.Wait();
Console.WriteLine("
Press any key to exit...");
Console.ReadLine();
}
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("
prev task result: " + prevResult);
// start and await the handler
return await handler();
};
_pending = wrapper();
return _pending;
}
}
}
}
The output:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
It works in accordance with the requirements, until re-entrancy is introduced in test #2:
asyncOp.RunAsync(() =>
{
asyncOp.RunAsync(() => handleAsync(200));
return handleAsync(100);
});
The desired output should be 100
, 200
, rather than 200
, 100
, because there's already a pending outer task for 100
. That's obviously because the inner task executes synchronously, breaking the logic var pending = _pending; /* ... */ _pending = wrapper()
for the outer task.
How to make it work for test #2, too?
One solution would be to enforce asynchrony for every task, with Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
. However, I don't want to impose asynchronous execution upon the command handlers which might be synchronous internally. Also, I don't want to depend on the behavior of any particular synchronization context (i.e. relying upon that Task.Factory.StartNew
should return before the created task has been actually started).
In the real-life project, I'm responsible for what AsyncOp
is above, but have no control over the command handlers (i.e., whatever is inside handleAsync
).
I almost forgot it's possible to construct a Task
manually, without starting or scheduling it. Then, "Task.Factory.StartNew" vs "new Task(...).Start" put me back on track. I think this is one of those few cases when the Task<TResult>
constructor may actually be useful, along with nested tasks (Task<Task<T>>
) and Task.Unwrap()
:
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("
prev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
The output:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
It's now also very easy to make AsyncOp
thread-safe by adding a lock
to protect _pending
, if needed.
Updated, this has been further improved with cancel/restart logic.
这篇关于任务排序和重入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!