使用队列会导致 asyncio 异常“got Future <Future pending>连接到不同的回路" [英] Using queues results in asyncio exception "got Future <Future pending> attached to a different loop"

查看:107
本文介绍了使用队列会导致 asyncio 异常“got Future <Future pending>连接到不同的回路"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 asyncio 队列运行这个简单的代码,但会捕获异常,甚至是嵌套异常.

I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions.

我想获得一些帮助,让 asyncio 中的队列正常工作:

I would like to get some help with making queues in asyncio work correctly:

import asyncio, logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)


num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []


async def run():
    for request in range(1):
        await in_queue.put(request)

    # each task consumes from 'input_queue' and produces to 'output_queue':
    for i in range(num_workers):
        tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
    # tasks.append(asyncio.create_task(saver()))

    print('waiting for queues...')
    await in_queue.join()
    # await out_queue.join()
    print('all queues done')

    for task in tasks:
        task.cancel()
    print('waiting until all tasks cancelled')
    await asyncio.gather(*tasks, return_exceptions=True)
    print('done')


async def worker(name):
    while True:
        try:
            print(f"{name} started")
            num = await in_queue.get()
            print(f'{name} got {num}')
            await asyncio.sleep(0)
            # await out_queue.put(num)
        except Exception as e:
            print(f"{name} exception {e}")
        finally:
            print(f"{name} ended")
            in_queue.task_done()


async def saver():
    while True:
        try:
            print("saver started")
            num = await out_queue.get()
            print(f'saver got {num}')
            await asyncio.sleep(0)
            print("saver ended")
        except Exception as e:
            print(f"saver exception {e}")
        finally:
            out_queue.task_done()


asyncio.run(run(), debug=True)
print('Done!')

输出:

waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception 
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
  File "Python37\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "temp4.py", line 23, in run
    await in_queue.join()
  File "Python37\lib\asyncio\queues.py", line 216, in join
    await self._finished.wait()
  File "Python37\lib\asyncio\locks.py", line 293, in wait
    await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "temp4.py", line 46, in worker
    in_queue.task_done()
  File "Python37\lib\asyncio\queues.py", line 202, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in <module>
    main()
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "temp4.py", line 63, in <module>
    asyncio.run(run(), debug=True)
  File "Python37\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "temp4.py", line 23, in run
    await in_queue.join()
  File "Python37\lib\asyncio\queues.py", line 216, in join
    await self._finished.wait()
  File "Python37\lib\asyncio\locks.py", line 293, in wait
    await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop

这是基本流程,我以后想做的是在更多工作人员上运行更多请求,其中每个工作人员将把数字从 in_queue 移动到 out_queue 然后保护程序将打印 out_queue 中的数字.

This is the basic flow, what I would like to do later is run more requests on more workers where each worker will move the number from in_queue to out_queue and then the saver will print the numbers from out_queue.

推荐答案

您的队列必须在循环内创建.您在为 asyncio.run() 创建的循环之外创建了它们,因此它们使用 events.get_event_loop().asyncio.run() 创建一个新循环,并且在一个循环中为队列创建的期货不能在另一个循环中使用.

Your queues must be created inside the loop. You created them outside the loop created for asyncio.run(), so they use events.get_event_loop(). asyncio.run() creates a new loop, and futures created for the queue in one loop can't then be used in the other.

在您的顶级 run() 协程中创建您的队列,并将它们传递给需要它们的协程,或者使用 contextvars.ContextVar 对象(如果必须使用全局变量).

Create your queues in your top-level run() coroutine, and either pass them to the coroutines that need them, or use contextvars.ContextVar objects if you must use globals.

您还需要清理在任务中处理任务取消的方式.通过引发 asyncio.CancelledError 异常 在任务中.你可以忽略它,但如果你抓住它来做清理工作,你必须重新加注.

You also need to clean up how you handle task cancelling inside your tasks. A task is cancelled by raising a asyncio.CancelledError exception in the task. You can ignore it, but if you catch it to do clean-up work, you must re-raise it.

您的任务代码无需重新引发即可捕获所有异常,包括CancelledError,因此您可以阻止适当的取消.

Your task code catches all exceptions without re-raising, including CancelledError, so you block proper cancellations.

相反,取消期间确实发生的是您调用queue.task_done();不要这样做,至少在您的任务被取消时不要这样做.您应该只在实际处理队列任务时调用 task_done(),但是您的代码在等待队列任务时发生异常时调用 task_done()出现.

Instead, what does happen during cancellation is that you call queue.task_done(); don't do that, at least not when your task is being cancelled. You should only call task_done() when you actually are handling a queue task, but your code calls task_done() when an exception occurs while waiting for a queue task to appear.

如果你需要使用try...finally: in_queue.task_done(),把它放在处理从队列接收的项目的代码块周围,并保持awaitin_queue.get() 外部 try 块.您不想将实际上并未收到的任务标记为已完成.

If you need to use try...finally: in_queue.task_done(), put this around the block of code that handles an item received from the queue, and keep the await in_queue.get() outside of that try block. You don't want to mark tasks done you didn't actually receive.

最后,当你打印异常时,你想打印它们的repr();由于历史原因,异常的 str() 转换会产生它们的 .args 值,这对于 CancelledError 异常不是很有帮助,它具有空 .args.在格式化字符串中使用 {e!r},这样你就可以看到你捕捉到了什么异常:

Finally, when you print exceptions, you want to print their repr(); for historical reasons, the str() conversion of exceptions produces their .args value, which is not very helpful for CancelledError exceptions, which have an empty .args. Use {e!r} in formatted strings, so you can see what exception you are catching:

worker-0 exception CancelledError()

因此,在启用 saver() 任务、在 run() 内部创建的队列和清理任务异常处理的更正代码将是:

So, corrected code, with the saver() task enabled, the queues created inside of run(), and task exception handling cleaned up, would be:

import asyncio, logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)


num_workers = 1


async def run():
    in_queue = asyncio.Queue()
    out_queue = asyncio.Queue()

    for request in range(1):
        await in_queue.put(request)

    # each task consumes from 'in_queue' and produces to 'out_queue':
    tasks = []
    for i in range(num_workers):
        tasks.append(asyncio.create_task(
            worker(in_queue, out_queue, name=f'worker-{i}')))
    tasks.append(asyncio.create_task(saver(out_queue)))

    await in_queue.join()
    await out_queue.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)

    print('done')

async def worker(in_queue, out_queue, name):
    print(f"{name} started")
    try:
        while True:
            num = await in_queue.get()
            try:
                print(f'{name} got {num}')
                await asyncio.sleep(0)
                await out_queue.put(num)
            except Exception as e:
                print(f"{name} exception {e!r}")
                raise
            finally:
                in_queue.task_done()
    except asyncio.CancelledError:
        print(f"{name} is being cancelled")
        raise
    finally:
        print(f"{name} ended")

async def saver(out_queue):
    print("saver started")
    try:
        while True:
            num = await out_queue.get()
            try:
                print(f'saver got {num}')
                await asyncio.sleep(0)
                print("saver ended")
            except Exception as e:
                print(f"saver exception {e!r}")
                raise
            finally:
                out_queue.task_done()
    except asyncio.CancelledError:
        print(f"saver is being cancelled")
        raise
    finally:
        print(f"saver ended")

asyncio.run(run(), debug=True)
print('Done!')

打印出来

worker-0 started
worker-0 got 0
saver started
saver got 0
saver ended
done
worker-0 is being cancelled
worker-0 ended
saver is being cancelled
saver ended
Done!

如果您想使用全局变量来共享队列对象,请使用 ContextVar 对象.您仍然在 run() 中创建队列,但是如果您要启动多个循环,那么 contextvars 模块集成将负责保持队列分离:

If you want to use globals, to share queue objects, then use ContextVar objects. You still create the queues in run(), but if you were to start multiple loops then the contextvars module integration will take care of keeping the queues separate:

from contextvars import ContextVar
# ...

in_queue = ContextVar('in_queue')
out_queue = ContextVar('out_queue')

async def run():
    in_, out = asyncio.Queue(), asyncio.Queue()
    in_queue.set(in_)
    out_queue.set(out)

    for request in range(1):
        await in_.put(request)

    # ...

    for i in range(num_workers):
        tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
    tasks.append(asyncio.create_task(saver()))

    await in_.join()
    await out.join()

    # ...

async def worker(name):
    print(f"{name} started")
    in_ = in_queue.get()
    out = out_queue.get()
    try:
        while True:
            num = await in_.get()
            try:
                # ...
                await out.put(num)
                # ...
            finally:
                in_.task_done()
    # ...

async def saver():
    print("saver started")
    out = out_queue.get()
    try:
        while True:
            num = await out.get()
            try:
                # ...
            finally:
                out.task_done()
    # ...

这篇关于使用队列会导致 asyncio 异常“got Future &lt;Future pending&gt;连接到不同的回路"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆