asyncio:是否可以取消由 Executor 运行的未来? [英] asyncio: Is it possible to cancel a future been run by an Executor?

查看:72
本文介绍了asyncio:是否可以取消由 Executor 运行的未来?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这似乎对我不起作用.

I would like to start a blocking function in an Executor using the asyncio call loop.run_in_executor and then cancel it later, but that doesn't seem to be working for me.

代码如下:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我希望上面的代码只允许阻塞函数输出:

I would expect the code above to only allow the blocking function to output:

blocking 0/5
blocking 1/5

然后查看非阻塞函数的输出.但相反,即使在我取消之后,阻塞的未来仍在继续.

and then see the output of the non blocking function. But instead the blocking future continues on even after I have canceled.

有可能吗?还有其他方法吗?

Is it possible? Is there some other way of doing it?

谢谢

更多关于使用 asyncio 运行阻塞和非阻塞代码的讨论:如何将阻塞和非阻塞代码与 asyncio 接口

More discussion on running blocking and non-blocking code using asyncio: How to interface blocking and non-blocking code with asyncio

推荐答案

在这种情况下,一旦 Future 真正开始运行,就没有办法取消它,因为你依赖于concurrent.futures.Future 的行为其文档声明如下:

In this case, there is no way to cancel the Future once it has actually started running, because you're relying on the behavior of concurrent.futures.Future, and its docs state the following:

cancel()

尝试取消通话.如果当前正在执行调用并且不能被取消,则该方法将返回False,否则调用将被取消,方法将返回 True.

因此,取消成功的唯一时间是任务仍在 Executor 内挂起.现在,您实际上正在使用一个 asyncio.Future 包裹在 concurrent.futures.Future 周围,并且实际上是由返回的 asyncio.Futureloop.run_in_executor() 如果您在调用 cancel() 后尝试 yield from 将引发 CancellationError,即使底层任务实际上已经在运行.但是,它不会实际上取消Executor中任务的执行.

So, the only time the cancellation would be successful is if the task is still pending inside of the Executor. Now, you're actually using an asyncio.Future wrapped around a concurrent.futures.Future, and in practice the asyncio.Future returned by loop.run_in_executor() will raise a CancellationError if you try to yield from it after you call cancel(), even if the underlying task is actually already running. But, it won't actually cancel the execution of the task inside the Executor.

如果您需要实际取消任务,则需要使用更常规的方法来中断在线程中运行的任务.您如何执行此操作的具体细节取决于用例.对于您在示例中展示的用例,您可以使用 threading.Event:

If you need to actually cancel the task, you'll need to use a more conventional method of interrupting the task running in the thread. The specifics of how you do that is use-case dependent. For the use-case you presented in the example, you could use a threading.Event:

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func

这篇关于asyncio:是否可以取消由 Executor 运行的未来?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆