使用asyncio.wait在任务异常后重试任务 [英] Retry task after task exception with asyncio.wait

查看:116
本文介绍了使用asyncio.wait在任务异常后重试任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个应同时运行的协程,其中一些可能会引发异常.在这种情况下,应重新运行协程.我该如何完成?我正在尝试做的最小演示:

I have multiple coroutines that should be run simultaneously, some of which may throw an exception. In those cases the coroutines should be run again. How do I accomplish this? Minimum demo of what I'm trying to do:

import asyncio
import time

t = time.time()


async def c1():
    print("finished c1 {}".format(time.time() - t))


async def c2():
    await asyncio.sleep(3)
    print("finished c2 {}".format(time.time() - t))


called = False


async def c3():
    global called
    # raises an exception the first time it's called
    if not called:
        called = True
        raise RuntimeError("c3 called the first time")
    print("finished c3 {}".format(time.time() - t))


async def run():
    pending = {c1(), c2(), c3()}

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                pending.add(task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))


asyncio.get_event_loop().run_until_complete(run())

c3()表示某些协程将失败,需要重新运行.演示的问题是完成的任务已完成并具有异常集,因此当我将其放回挂起的集时,下一个运行循环将立即退出,而无需重新运行 c3(),因为它已经完成了.

c3() represents that some coroutines will fail and need to be rerun. The problem with the demo is the finished task is finished and has exception set, so when I put it back onto the pending set, the next run loop exits immediately without rerunning c3() because it's already done.

有没有一种方法可以清除任务,使其再次运行 c3()?我知道附加到任务的协程实例将无法再等待,否则我会得到

Is there a way to clear the task so that it will run c3() again? I know that the coroutine instance attached to the task cannot be awaited on again else I get

RuntimeError('无法重用已经等待的协程',)

这意味着我必须手动管理从协程实例到生成它的协程的映射,然后使用 task._coro 检索失败的协程实例-是这样吗?

which means I have to manually manage a map from coroutine instance to the coroutine that generated it, then retrieve the failed coroutine instance with task._coro - is this right?

推荐答案

编辑:任务本身可能是地图中的键,它更简洁.

EDIT: the task itself could be the key in the map, which is cleaner.

async def run():
    tasks = {asyncio.ensure_future(c()): c for c in (c1, c2, c3)}
    pending = set(tasks.keys())

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                coro = tasks[task]
                new_task = asyncio.ensure_future(coro())
                tasks[new_task] = coro
                pending.add(new_task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))

这篇关于使用asyncio.wait在任务异常后重试任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆