如果一项失败,如何取消收集中的所有剩余任务? [英] How to cancel all remaining tasks in gather if one fails?

查看:68
本文介绍了如果一项失败,如何取消收集中的所有剩余任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果 gather 的一项任务引发异常,则仍然允许其他任务继续.

In case one task of gather raises an exception, the others are still allowed to continue.

嗯,那不是我真正需要的.我想区分致命的错误和需要取消所有剩余任务的错误,而不是不是的错误应该在允许其他任务继续执行的同时进行记录.

Well, that's not exactly what I need. I want to distinguish between errors that are fatal and need to cancel all remaining tasks, and errors that are not and instead should be logged while allowing other tasks to continue.

这是我实现此失败的尝试:

Here is my failed attempt to implement this:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())

(最终等待睡眠是为了防止解释器立即关闭,这将自行取消所有任务)

(the finally await sleep here is to prevent the interpreter from closing immediately, which would on its own cancel all tasks)

奇怪的是,在 gather 上调用 cancel 并不能真正取消它!

Oddly, calling cancel on the gather does not actually cancel it!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.

我对此行为感到非常惊讶,因为它似乎与

I am very surprised by this behavior since it seems to be contradictory to the documentation, which states:

asyncio.gather(* coros_or_futures,loop = None,return_exceptions = False)

从给定的协程对象或期货返回未来的汇总结果.

Return a future aggregating results from the given coroutine objects or futures.

(...)

取消:如果取消外部Future,则所有子项(尚未完成)也将被取消.(...)

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)

我在这里想念什么?如何取消剩余的任务?

What am I missing here? How to cancel the remaining tasks?

推荐答案

您的实现的问题在于,在 sleepers 已经产生之后,它将调用 sleepers.cancel().从技术上讲, gather()返回的未来处于完成状态,因此取消必须为空.

The problem with your implementation is that it calls sleepers.cancel() after sleepers has already raised. Technically the future returned by gather() is in a completed state, so its cancellation must be no-op.

要更正代码,您只需要自己取消子级,而不要相信 gather 的未来.当然,协程本身并不是可以取消的,因此您需要先将它们转换为任务(无论如何, gather 都可以完成任务,因此您无需做任何额外的工作).例如:

To correct the code, you just need to cancel the children yourself instead of trusting gather's future to do it. Of course, coroutines are not themselves cancelable, so you need to convert them to tasks first (which gather would do anyway, so you're doing no extra work). For example:

async def main():
    tasks = [asyncio.ensure_future(my_sleep(secs))
             for secs in [2, 5, 7]]
    try:
        await asyncio.gather(*tasks)
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        for t in tasks:
            t.cancel()
    finally:
        await sleep(5)

我对这种行为感到非常惊讶,因为它似乎与文档相矛盾[...]

I am very surprised by this behavior since it seems to be contradictory to the documentation[...]

使用 gather 收集的最初绊脚石是,它实际上并不是 run 任务,它只是等待任务完成的助手.因此,如果其中的一些失败并导致异常, gather 不会费心地取消剩余的任务-它只是放弃等待并传播异常,而将剩余的任务留在后台继续进行.它被报告为错误,但由于向后兼容而未得到修复,并且该行为已被记录且未更改从一开始就.但是,这里有另一个缺点:文档明确承诺可以取消返回的未来.您的代码确实做到了这一点,并且行不通,没有一个明显的原因(至少花了我一段时间才弄清楚,并要求阅读 Future 的合同实际上阻止了此工作.到您调用 cancel()时, gather 返回的未来已经完成,取消已完成的未来是没有意义的,只是没事(原因是,完成的将来会有明确的结果,而外部代码可以观察到结果.取消将来会改变其结果,这是不允许的.)

The initial stumbling block with gather is that it doesn't really run tasks, it's just a helper to wait for them to finish. For this reason gather doesn't bother to cancel the remaining tasks if some of them fails with an exception - it just abandons the wait and propagates the exception, leaving the remaining tasks to proceed in the background. This was reported as a bug, but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning. But here we have another wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source). It turns out that the contract of Future actually prevents this from working. By the time you call cancel(), the future returned by gather has already completed, and cancelling a completed future is meaningless, it is just no-op. (The reason is that a completed future has a well-defined result that could have been observed by outside code. Cancelling a future would change its result, which is not allowed.)

换句话说,该文档并不是错误,因为如果您在 await sleepers 完成之前执行取消操作,则该取消操作会起作用.但是,它是误导性的,因为它似乎允许在其重要的使用案例之一中取消 gather(),但实际上是不允许的.

In other words, the documentation is not wrong, because canceling would have worked if you had performed it prior to await sleepers having completed. However, it's misleading, because it appears to allow canceling gather() in this important use case of one of its awaitable raising, but in reality doesn't.

使用 gather 时弹出的此类问题是许多人急切等待(无双关语的)三人式托儿所的原因

Problems like this that pop up when using gather are reason why many people eagerly await (no pun intended) trio-style nurseries in asyncio.

这篇关于如果一项失败,如何取消收集中的所有剩余任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆