是否可以在异步 python 中暂停和重新启动任务? [英] Is it possible to suspend and restart tasks in async python?
问题描述
问题应该很简单,但我找不到任何相关信息.
The question should be simple enough but I couldn't find anything about it.
我有一个异步 python 程序,其中包含一个运行时间相当长的任务,我希望能够在任意点暂停和重新启动(任意点当然意味着在任何有 await 关键字的地方).我希望有一些类似于 task.suspend()
和 task.resume()
的东西,但似乎没有.在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己做这件事?我不想在每次等待之前放置一个 event.wait()
...
I have an async python program that contains a rather long running task that I want to be able to suspend and restart at arbitrary points (arbitrary of course meaning everywhere where there's an await keyword).
I was hoping there was something along the lines of task.suspend()
and task.resume()
but it seems there isn't.
Is there any API for this on task- or event-loop-level or would I need to do this myself somehow? I don't want to place an event.wait()
before every await...
谢谢
推荐答案
您的要求是可能的,但并非微不足道.首先,请注意,您永远不能在 every await
上挂起,而只能在导致协程挂起的那些上挂起,例如 asyncio.sleep()
,或没有准备好返回数据的 stream.read()
.等待一个协程立即开始执行它,如果协程可以立即返回,它不会进入事件循环.await
仅在等待者(或其 等待者等)请求时暂停到事件循环.这些问题的更多详细信息:[1],[2],[3]、[4].
What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await
, but only on those that result in suspension of the coroutine, such as asyncio.sleep()
, or a stream.read()
that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await
only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].
考虑到这一点,您可以使用 this answer 中的技术来拦截带有附加代码的协程的每次恢复检查任务是否暂停,如果暂停,则在继续之前等待恢复事件.
With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.
import asyncio
class Suspendable:
def __init__(self, target):
self._target = target
self._can_run = asyncio.Event()
self._can_run.set()
self._task = asyncio.ensure_future(self)
def __await__(self):
target_iter = self._target.__await__()
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
# This "while" emulates yield from.
while True:
# wait for can_run before resuming execution of self._target
try:
while not self._can_run.is_set():
yield from self._can_run.wait().__await__()
except BaseException as err:
send, message = iter_throw, err
# continue with our regular program
try:
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err
def suspend(self):
self._can_run.clear()
def is_suspended(self):
return not self._can_run.is_set()
def resume(self):
self._can_run.set()
def get_task(self):
return self._task
测试:
import time
async def heartbeat():
while True:
print(time.time())
await asyncio.sleep(.2)
async def main():
task = Suspendable(heartbeat())
for i in range(5):
print('suspending')
task.suspend()
await asyncio.sleep(1)
print('resuming')
task.resume()
await asyncio.sleep(1)
asyncio.run(main())
这篇关于是否可以在异步 python 中暂停和重新启动任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!