是否可以在异步 python 中暂停和重新启动任务? [英] Is it possible to suspend and restart tasks in async python?

查看:37
本文介绍了是否可以在异步 python 中暂停和重新启动任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题应该很简单,但我找不到任何相关信息.

The question should be simple enough but I couldn't find anything about it.

我有一个异步 python 程序,其中包含一个运行时间相当长的任务,我希望能够在任意点暂停和重新启动(任意点当然意味着在任何有 await 关键字的地方).我希望有一些类似于 task.suspend()task.resume() 的东西,但似乎没有.在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己做这件事?我不想在每次等待之前放置一个 event.wait()...

I have an async python program that contains a rather long running task that I want to be able to suspend and restart at arbitrary points (arbitrary of course meaning everywhere where there's an await keyword). I was hoping there was something along the lines of task.suspend() and task.resume() but it seems there isn't. Is there any API for this on task- or event-loop-level or would I need to do this myself somehow? I don't want to place an event.wait() before every await...

谢谢

推荐答案

您的要求是可能的,但并非微不足道.首先,请注意,您永远不能在 every await 上挂起,而只能在导致协程挂起的那些上挂起,例如 asyncio.sleep(),或没有准备好返回数据的 stream.read().等待一个协程立即开始执行它,如果协程可以立即返回,它不会进入事件循环.await 仅在等待者(或 等待者等)请求时暂停到事件循环.这些问题的更多详细信息:[1][2],[3][4].

What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await, but only on those that result in suspension of the coroutine, such as asyncio.sleep(), or a stream.read() that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].

考虑到这一点,您可以使用 this answer 中的技术来拦截带有附加代码的协程的每次恢复检查任务是否暂停,如果暂停,则在继续之前等待恢复事件.

With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.

import asyncio

class Suspendable:
    def __init__(self, target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send, message = iter_throw, err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task

测试:

import time

async def heartbeat():
    while True:
        print(time.time())
        await asyncio.sleep(.2)

async def main():
    task = Suspendable(heartbeat())
    for i in range(5):
        print('suspending')
        task.suspend()
        await asyncio.sleep(1)
        print('resuming')
        task.resume()
        await asyncio.sleep(1)

asyncio.run(main())

这篇关于是否可以在异步 python 中暂停和重新启动任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆