暂停/恢复异步任务的模式? [英] A pattern to pause/resume an async task?

查看:20
本文介绍了暂停/恢复异步任务的模式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主要是 IO 绑定的连续任务(后台拼写检查器与拼写检查服务器交谈).有时,此任务需要暂停并稍后恢复,具体取决于用户活动.

I have a mostly IO-bound continuous task (a background spellchecker talking to a spellcheck server). Sometimes, this task needs to be put on hold and resumed later, depending on the user activity.

虽然暂停/恢复本质上是 async/await 所做的,但我发现很少有关于如何为异步方法实现实际暂停/播放逻辑的信息.有推荐的模式吗?

While suspend/resume is essentially what async/await does, I've found little information on how to implement the actual pause/play logic for an asynchronous method. Is there a recommended pattern for this?

我还考虑过使用 Stephen Toub 的 AsyncManualResetEvent 用于此,但认为这可能是一种矫枉过正.

I've also looked at using Stephen Toub's AsyncManualResetEvent for this, but thought it might be an overkill.

推荐答案

2019 年更新,我最近有机会重温这段代码,下面是作为控制台应用程序的完整示例(警告: PauseTokenSource 需要很好的单元测试.

Updated for 2019, I've recently had a chance to revisit this code, below is complete example as a console app (warning: PauseTokenSource needs good unit testing).

请注意,就我而言,要求是当消费者端代码(请求暂停)继续时,生产者端代码应该已经达到暂停状态.因此,当 UI 准备好反映暂停状态时,预计所有后台活动都已经暂停.

Note, in my case, the requirement was that when the consumer-side code (which requested the pause) would continue, the producer-side code should have already reached the paused state. Thus, by the time the UI is ready to reflect the paused state, all background activity is expected to have been already paused.

using System;
using System.Threading.Tasks;
using System.Threading;

namespace Console_19613444
{
    class Program
    {
        // PauseTokenSource
        public class PauseTokenSource
        {
            bool _paused = false;
            bool _pauseRequested = false;

            TaskCompletionSource<bool> _resumeRequestTcs;
            TaskCompletionSource<bool> _pauseConfirmationTcs;

            readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
            readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);

            public PauseToken Token { get { return new PauseToken(this); } }

            public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    return _paused;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (!_paused)
                    {
                        return;
                    }

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        var resumeRequestTcs = _resumeRequestTcs;
                        _paused = false;
                        _pauseRequested = false;
                        _resumeRequestTcs = null;
                        _pauseConfirmationTcs = null;
                        resumeRequestTcs.TrySetResult(true);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task PauseAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (_paused)
                    {
                        return;
                    }

                    Task pauseConfirmationTask = null;

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        _pauseRequested = true;
                        _resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        _pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }

                    await pauseConfirmationTask;

                    _paused = true;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            private async Task WaitForResumeRequestAsync(CancellationToken token)
            {
                using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _resumeRequestTcs.Task;
                }
            }

            private async Task WaitForPauseConfirmationAsync(CancellationToken token)
            {
                using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _pauseConfirmationTcs.Task;
                }
            }

            internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                Task resumeRequestTask = null;

                await _pauseRequestAsyncLock.WaitAsync(token);
                try
                {
                    if (!_pauseRequested)
                    {
                        return;
                    }
                    resumeRequestTask = WaitForResumeRequestAsync(token);
                    _pauseConfirmationTcs.TrySetResult(true);
                }
                finally
                {
                    _pauseRequestAsyncLock.Release();
                }

                await resumeRequestTask;
            }
        }

        // PauseToken - consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public Task<bool> IsPaused() { return _source.IsPaused(); }

            public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                return _source.PauseIfRequestedAsync(token);
            }
        }

        // Basic usage

        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                while (true)
                {
                    token.ThrowIfCancellationRequested();

                    Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
                    await pause.PauseIfRequestedAsync();
                    Console.WriteLine("After await pause.PauseIfRequestedAsync()");

                    await Task.Delay(1000);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        static async Task Test(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                await pts.ResumeAsync();
                Console.WriteLine("After resume");
            }
        }

        static async Task Main()
        {
            await Test(CancellationToken.None);
        }
    }
}

这篇关于暂停/恢复异步任务的模式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆