asyncio/aiohttp-create_task()阻止事件循环,在“此事件循环已在运行"中收集结果. [英] asyncio/aiohttp - create_task() blocks event loop, gather results in "This event loop is already running "

查看:124
本文介绍了asyncio/aiohttp-create_task()阻止事件循环,在“此事件循环已在运行"中收集结果.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不能同时使我的使用者和生产者同时运行,似乎worker()或aiohttp服务器正在阻塞-即使与asyncio.gather()同时执行

I cannot get both my consumer and my producer running at the same time, it seems worker(), or the aiohttp server are blocking - even when executed simultaneously with asyncio.gather()

相反,如果我执行loop.create_task(worker),它将阻塞并且服务器将永远不会启动.

If instead I do loop.create_task(worker), this will block and server will never be started.

我已经尝试了我可以想象的所有变化,包括nest_asyncio模块-我只能运行两个组件之一.

I've tried every variation I can imagine, including nest_asyncio module - and I can only ever get one of the two components running.

我在做什么错了?

async def worker():
    batch_size = 30

    print("running worker")
    while True:
        if queue.qsize() > 0:
            future_map = {}

            size = min(queue.qsize(), batch_size)
            batch = []
            for _ in range(size):
                item = await queue.get()
                print("Item: "+str(item))
                future_map[item["fname"]] = item["future"]
                batch.append(item)

            print("processing", batch)
            results = await process_files(batch)
            for dic in results:
                for key, value in dic.items():
                    print(str(key)+":"+str(value))
                    future_map[key].set_result(value)

            # mark the tasks done
            for _ in batch:
                queue.task_done()



def start_worker():
    loop.create_task(worker())

def create_app():
    app = web.Application()
    routes = web.RouteTableDef()
    @routes.post("/decode")
    async def handle_post(request):
        return await decode(request)
    app.add_routes(routes)
    app.on_startup.append(start_worker())
    return app

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    app = create_app()
    web.run_app(app)

上面的打印正在运行的工人"并且不会启动AIOHTTP服务器.

The above prints "running worker" and does not start the AIOHTTP server.

def run(loop, app, port=8001):
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', port)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(handler.finish_connections(1.0))
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.finish())
loop.close()

def main(app):
    asyncio.gather(run(loop, app), worker())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    app = create_app()
    main(app)

以上启动服务器,但不启动工作服务器.

The above starts server, but not the worker.

推荐答案

尽管 await asyncio.sleep(0)可以解决当前的问题,但这不是理想的解决方案;实际上,这有点反模式.要了解原因,让我们更详细地研究问题发生的原因.问题的核心是工作者的 while 循环-队列变空后,它实际上可以归结为:

Although await asyncio.sleep(0) fixes the immediate issue, it's not an ideal fix; in fact, it's somewhat of an anti-pattern. To understand why, let's examine why the problem occurs in more detail. The heart of the matter is the worker's while loop - as soon as the queue becomes empty, it effectively boils down to:

while True:
    pass

当然,标记为 pass 的部分包含对 qsize()的检查,如果队列是非空的,则导致执行附加代码,但是一次qsize()首先达到0,该检查将始终为false.这是因为asyncio是单线程的,并且当 qsize()== 0 时, while 循环不再遇到单个 await .没有 await ,就不可能将控制权交给可能填充队列的协程或回调,并且 while 循环将变为无限.

Sure, the part marked as pass contains a check for qsize() leading to execution of additional code if the queue is non-empty, but once qsize() first reaches 0, that check will always evaluate to false. This is because asyncio is single-threaded and when qsize() == 0 the, the while loop no longer encounters a single await. Without await, it's impossible to relinquish control to a coroutine or callback that might populate the queue, and the while loop becomes infinite.

这就是为什么在循环内等待 asyncio.sleep(0)的原因:它强制进行上下文切换,确保其他协程将有机会运行并最终重新填充队列.但是,它还可以使 while 循环始终保持运行状态,这意味着即使队列连续几个小时保持为空,事件循环也永远不会进入睡眠状态.只要工作者处于活动状态,事件循环就会一直处于繁忙等待状态.您可以按照Dirn的建议通过将睡眠间隔调整为非零值来缓解繁忙等待,但这会引入延迟,并且在没有活动的情况下仍然不允许事件循环进入睡眠状态.

This is why await asyncio.sleep(0) inside the loop helps: it forces a context switch, guaranteeing that other coroutines will get a chance to run and eventually re-populate the queue. However, it also keeps the while loop constantly running, which means that the event loop will never go to sleep, even if the queue remains empty for hours on end. The event loop will remain in a busy-waiting state for as long as the worker is active. You could alleviate the busy-wait by adjusting the sleep interval to a non-zero value, as suggested by dirn, but that will introduce latency, and will still not allow the event loop to go to sleep when there's no activity.

正确的解决方法是检查 qsize(),而是使用 queue.get()获得下一项.它会根据需要睡眠直到物品出现,然后立即唤醒协程.不必担心这会阻止"邮件.工人-恰恰是异步的要点,您可以拥有多个协程,而其中一个被阻塞".在等待中只是让其他人继续前进.例如:

The proper fix is to not check for qsize(), but to use queue.get() to get the next item. This will sleep as long as needed until the item appears, and immediately wake up the coroutine once it does. Don't worry that this will "block" the worker - it's precisely the point of asyncio that you can have multiple coroutines and that one being "blocked" on an await simply allows others to proceed. For example:

async def worker():
    batch_size = 30

    while True:
        # wait for an item and add it to the batch
        batch = [await queue.get()]
        # batch up more items if available
        while not queue.empty() and len(batch) < batch_size:
            batch.append(await queue.get())
        # process the batch
        future_map = {item["fname"]: item["future"] for item in batch}
        results = await process_files(batch)
        for dic in results:
            for key, value in dic.items():
                print(str(key)+":"+str(value))
                future_map[key].set_result(value)
        for _ in batch:
            queue.task_done()

在此变体中,我们在循环的每次迭代中都等待某件事,并且不需要休眠.

In this variant we await for something in every iteration of the loop, and sleeping is not needed.

这篇关于asyncio/aiohttp-create_task()阻止事件循环,在“此事件循环已在运行"中收集结果.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆