Python Asyncio-RuntimeError:无法关闭正在运行的事件循环 [英] Python Asyncio - RuntimeError: Cannot close a running event loop

查看:620
本文介绍了Python Asyncio-RuntimeError:无法关闭正在运行的事件循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解决此错误: RuntimeError:无法在asyncio进程中关闭正在运行的事件循环。我相信这是因为在任务仍未完成时发生故障,然后尝试关闭事件循环。我想我需要在关闭事件循环之前等待其余的响应,但是我不确定如何在我的特定情况下正确完成该操作。

I'm trying to resolve this error: RuntimeError: Cannot close a running event loop in my asyncio process. I believe it's happening because there's a failure while tasks are still pending, and then I try to close the event loop. I'm thinking I need to await the remaining responses prior to closing the event loop, but I'm not sure how to accomplish that correctly in my specific situation.

 def start_job(self):

        if self.auth_expire_timestamp < get_timestamp():
            api_obj = api_handler.Api('Api Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()


        try:
            self.queue_manager(self.do_stuff(json_data))
        except aiohttp.ServerDisconnectedError as e:
            logging.info("Reconnecting...")
            api_obj = api_handler.Api('API Name', self.dbObj)
            self.api_auth_resp = api_obj.get_auth_response()
            self.api_attr = api_obj.get_attributes()
            self.run_eligibility()

async def do_stuff(self, data):

    tasks = []

    async with aiohttp.ClientSession() as session:
        for row in data:
            task = asyncio.ensure_future(self.async_post('url', session, row))
            tasks.append(task)
        result = await asyncio.gather(*tasks)
    self.load_results(result)


def queue_manager(self, method):
    self.loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(method)
    self.loop.run_until_complete(future)


async def async_post(self, resource, session, data):
        async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
            resp = []
            try:
                headers = response.headers['foo']
                content = await response.read()
                resp.append(headers)
                resp.append(content)
            except KeyError as e:
                logging.error('KeyError at async_post response')
                logging.error(e)
        return resp


def shutdown(self):
    //need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
    self.loop.close() 
    return True

我该如何处理错误并正确关闭事件循环,以便我可以启动一个新程序并从本质上重新启动整个程序并继续。

How can I handle the error and properly close the event loop so I can start a new one and essentially re-boot the whole program and continue on.

编辑:

这是我现在正在尝试的,基于此答案。不幸的是,这种错误很少发生,因此,除非我可以强制执行,否则我将不得不等待并观察其是否有效。在我的 queue_manager 方法中,将其更改为:

This is what I'm trying now, based on this SO answer. Unfortunately, this error only happens rarely, so unless I can force it, i will have to wait and see if it works. In my queue_manager method I changed it to this:

try:
   self.loop.run_until_complete(future)
except Exception as e:
   future.cancel()
   self.loop.run_until_complete(future)
   future.exception()

更新:

我摆脱了 shutdown()方法,并将其添加到我的 queue_manager()方法中,似乎没有问题:

I got rid of the shutdown() method and added this to my queue_manager() method instead and it seems to be working without issue:

  try:
        self.loop.run_until_complete(future)
    except Exception as e:
        future.cancel()
        self.check_in_records()
        self.reconnect()
        self.start_job()
        future.exception()


推荐答案

要回答最初提出的问题,没有必要到 close()一个正在运行的循环,您可以为整个程序重用同一循环。

To answer the question as originally stated, there is no need to close() a running loop, you can reuse the same loop for the whole program.

给出更新中的代码,您的 queue_manager 可能看起来像这样:

Given the code in the update, your queue_manager could look like this:

try:
    self.loop.run_until_complete(future)
except Exception as e:
    self.check_in_records()
    self.reconnect()
    self.start_job()

取消未来是没有必要的据我所知没有任何作用。这不同于引用的答案,后者专门对 KeyboardInterrupt 做出反应,特殊,因为它是由asyncio本身引发的。 KeyboardInterrupt 可以通过 run_until_complete 传播,而无需将来真正完成。在asyncio中正确处理 Ctrl-C 是非常困难的,甚至是不可能的(请参阅此处了解详细信息),但幸运的是,问题根本不是关于 Ctrl-C 的问题,而是协程引发的异常。 (请注意, KeyboardInterrupt 不会从 Exception 继承,因此对于 Ctrl-C 除外主体甚至不会执行。)

Cancelling future is not necessary and as far as I can tell has no effect. This is different from the referenced answer which specifically reacts to KeyboardInterrupt, special because it is raised by asyncio itself. KeyboardInterrupt can be propagated by run_until_complete without the future having actually completed. Handling Ctrl-C correctly in asyncio is very hard or even impossible (see here for details), but fortunately the question is not about Ctrl-C at all, it is about exceptions raised by the coroutine. (Note that KeyboardInterrupt doesn't inherit from Exception, so in case of Ctrl-C the except body won't even execute.)


我正在取消未来,因为在这种情况下还有其他待处理的任务,我想基本上删除了这些任务并开始了新的事件循环。

I was canceling the future because in this instance there are remaining tasks pending and i want to essentially remove those tasks and start a fresh event loop.

这是正确的做法,但是其中的代码(已更新)问题只是取消了一个未来,一个已经传递给 run_until_complete 的未来。回想一下,future是将在以后提供的结果值的占位符。提供值后,可以通过调用 future.result()来检索它。如果将来的值是一个例外,则 future.result()将引发该例外。 run_until_complete 具有合同,它将在给定的未来产生值之前一直运行事件循环,然后返回该值。如果值实际上是要筹集的例外,则 run_until_complete 将重新筹集它。例如:

This is a correct thing to want to do, but the code in the (updated) question is only canceling a single future, the one already passed to run_until_complete. Recall that a future is a placeholder for a result value that will be provided at a later point. Once the value is provided, it can be retrieved by calling future.result(). If the "value" of the future is an exception, future.result() will raise that exception. run_until_complete has the contract that it will run the event loop for as long as it takes for the given future to produce a value, and then it returns that value. If the "value" is in fact an exception to raise, then run_until_complete will re-raise it. For example:

loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)

当有问题的未来实际上是 Task ,一个特定于异步的对象,将协程包装成未来,这种未来的结果就是协程返回的对象。如果协程引发异常,则检索结果将重新引发,因此 run_until_complete

When the future in question is in fact a Task, an asyncio-specific object that wraps a coroutine into a Future, the result of such future is the object returned by the coroutine. If the coroutine raises an exception, then retrieving the result will re-raise it, and so will run_until_complete:

async def fail():
    1/0

loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)

在处理任务时, run_until_complete 完成意味着协程也已经完成,或者返回了一个值或引发异常,具体取决于 run_until_complete 返回或引发。

When dealing with a task, run_until_complete finishing means that the coroutine has finished as well, having either returned a value or raised an exception, as determined by run_until_complete returning or raising.

另一方面,取消任务是可行的通过安排要恢复的任务和 await 表达式将其暂停以引发 CancelledError 。除非任务专门捕获并抑制了此异常(行为正常的异步代码不应该这样做),否则任务将停止执行,并且 CancelledError 将成为其结果。但是,如果在调用 cancel()时协程已经完成,则 cancel()不能执行任何操作,因为在那里没有待处理的 await CancelledError 注入。

On the other hand, cancelling a task works by arranging for the task to be resumed and the await expression that suspended it to raise CancelledError. Unless the task specifically catches and suppresses this exception (which well-behaved asyncio code is not supposed to do), the task will stop executing and the CancelledError will become its result. However, if the coroutine is already finished when cancel() is called, then cancel() cannot do anything because there is no pending await to inject CancelledError into.

这篇关于Python Asyncio-RuntimeError:无法关闭正在运行的事件循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆