asyncio:是否可以取消由 Executor 运行的未来? [英] asyncio: Is it possible to cancel a future been run by an Executor?
问题描述
我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这似乎对我不起作用.
I would like to start a blocking function in an Executor using the asyncio call loop.run_in_executor and then cancel it later, but that doesn't seem to be working for me.
代码如下:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
我希望上面的代码只允许阻塞函数输出:
I would expect the code above to only allow the blocking function to output:
blocking 0/5
blocking 1/5
然后查看非阻塞函数的输出.但相反,即使在我取消之后,阻塞的未来仍在继续.
and then see the output of the non blocking function. But instead the blocking future continues on even after I have canceled.
有可能吗?还有其他方法吗?
Is it possible? Is there some other way of doing it?
谢谢
更多关于使用 asyncio 运行阻塞和非阻塞代码的讨论:如何将阻塞和非阻塞代码与 asyncio 接口
More discussion on running blocking and non-blocking code using asyncio: How to interface blocking and non-blocking code with asyncio
推荐答案
在这种情况下,一旦 Future
真正开始运行,就没有办法取消它,因为你依赖于concurrent.futures.Future
和 的行为其文档声明如下:
In this case, there is no way to cancel the Future
once it has actually started running, because you're relying on the behavior of concurrent.futures.Future
, and its docs state the following:
cancel()
尝试取消通话.如果当前正在执行调用并且不能被取消,则该方法将返回False
,否则调用将被取消,方法将返回 True
.
因此,取消成功的唯一时间是任务仍在 Executor
内挂起.现在,您实际上正在使用一个 asyncio.Future
包裹在 concurrent.futures.Future
周围,并且实际上是由返回的 asyncio.Future
loop.run_in_executor()
如果您在调用 cancel()
后尝试 yield from
将引发 CancellationError
,即使底层任务实际上已经在运行.但是,它不会实际上取消Executor
中任务的执行.
So, the only time the cancellation would be successful is if the task is still pending inside of the Executor
. Now, you're actually using an asyncio.Future
wrapped around a concurrent.futures.Future
, and in practice the asyncio.Future
returned by loop.run_in_executor()
will raise a CancellationError
if you try to yield from
it after you call cancel()
, even if the underlying task is actually already running. But, it won't actually cancel the execution of the task inside the Executor
.
如果您需要实际取消任务,则需要使用更常规的方法来中断在线程中运行的任务.您如何执行此操作的具体细节取决于用例.对于您在示例中展示的用例,您可以使用 threading.Event
:
If you need to actually cancel the task, you'll need to use a more conventional method of interrupting the task running in the thread. The specifics of how you do that is use-case dependent. For the use-case you presented in the example, you could use a threading.Event
:
def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
这篇关于asyncio:是否可以取消由 Executor 运行的未来?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!