异步在RuntimeError中:没有正在运行的事件循环 [英] Asyncio in corroutine RuntimeError: no running event loop

查看:171
本文介绍了异步在RuntimeError中:没有正在运行的事件循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写多进程代码,该代码可以在Python 3.7中完美运行.但是我希望使用AsyncIO的并行进程之一来执行IO进程,以取得更好的性能,但是却无法使其运行.

I'm writing multi-process code, which runs perfectly in Python 3.7. Yet I want one of the parallel process to execute an IO process take stakes for ever using AsyncIO i order to get better performance, but have not been able to get it to run.

Ubuntu 18.04,Python 3.7,AsyncIO,pipenv(已安装所有pip库)

Ubuntu 18.04, Python 3.7, AsyncIO, pipenv (all pip libraries installed)

该方法特别使用多线程运行,这是我想用AsyncIO替换的东西.

The method in particular runs as expected using multithreading, which is what I want to replace with AsyncIO.

我已经搜索过并尝试在main()函数中进行循环,现在仅在预期的例程中进行了循环,查看了示例并了解了这种新的异步方法来解决问题,到目前为止没有任何结果.

I have googled and tried looping in the main() function and now only in the intended cor-routine, have looked at examples and read about this new Async way of getting things down and no results so far.

以下是应执行的app.py代码: python app.py

The following is the app.py code which is esecuted: python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
    logging.config.dictConfig(log_config())
else:
    logging.basicConfig(
        level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
    try:
        <some> = <some-class>([
            'some-data1.csv',
            'some-data2.csv'
            ])
        <some>.run()

    except:

        traceback.print_exc()
        pdb.post_mortem()

    sys.exit(0)


if __name__ == '__main__':

    asyncio.run(main())

这是我定义给定类的代码

Here is the code where I have the given class defined

    _sql_client = SQLServer()
    _blob_client = BlockBlobStore()
    _keys = KeyVault()
    _data_source = _keys.fetch('some-data')
    #  Multiprocessing
    _manager = mp.Manager()
    _ns = _manager.Namespace()

    def __init__(self, list_of_collateral_files: list) -> None:

    @timeit
    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _calc_bid_per_path(self) -> None:

    @timeit
    def run(self) -> None:

包含异步代码的方法在这里:

The method containing the async code is here:

    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

        all_files = self._blob_client.download_blobs(self._list_of_blob_files)

        _all_dfs = pd.DataFrame()
        async def read_task(file_: str) -> None:
            nonlocal _all_dfs
            df = pd.read_csv(StringIO(file_.content))
            _all_dfs = _all_dfs.append(df, sort=False)

        tasks = []
        loop = asyncio.new_event_loop()

        for file_ in all_files:
            tasks.append(asyncio.create_task(read_task(file_)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

        _all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
        ns.dfs = _all_dfs

调用特定序列的方法以及此异步方法是:

And the method that calls the particular sequence and and this async method is:

    def run(self) -> None:
        extract = []
        extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
        extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
        extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))

        #  Start the parallel processes
        for process in extract:
            process.start()

        #  Await for database process to end
        extract[1].join()
        extract[2].join()

        #  Merge both database results
        self._merge_bids_with_hours(self._ns)

        extract[0].join()

        self._get_collaterial_per_month(self._ns)
        self._calc_bid_per_path()
        self._save_reports()
        self._upload_data()

这些是我得到的错误:

Process Process-2:
Traceback (most recent call last):
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
    tasks.append(asyncio.create_task(read_task(file_)))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
  traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
  File "app.py", line 25, in main
    caiso.run()
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
    self._get_collaterial_per_month(self._ns)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
    credit_margin = ns.dfs
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)

推荐答案

Traceback 日志看来,您似乎试图将任务添加到未运行的事件循环.

As it seems from the Traceback log it is look like you are trying to add tasks to not running event loop.

/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313:运行时警告:协程'._get_filter_collat​​eral..read_task'从来没有等待中

loop 刚刚创建,还没有运行,因此 asyncio 无法附加任务.

The loop was just created and it's not running yet, therefor the asyncio unable to attach tasks to it.

下面的示例将重现相同的结果,添加任务,然后尝试等待以完成所有任务:

The following example will reproduce the same results, adding tasks and then trying to await for all of them to finish:

import asyncio
async def func(num):
    print('My name is func {0}...'.format(num))

loop = asyncio.get_event_loop()
tasks = list()
for i in range(5):
    tasks.append(asyncio.create_task(func(i)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

结果:

Traceback (most recent call last):
  File "C:/tmp/stack_overflow.py", line 42, in <module>
    tasks.append(asyncio.create_task(func(i)))
  File "C:\Users\Amiram\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 324, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited

尽管如此,解决方案非常简单,您只需要将任务添加到创建的循环中 - 而不是要求 asyncio 去执行它.在下面的行中唯一需要的更改:

Nonetheless the solution is pretty simple, you just need to add the tasks to the created loop - instead of asking the asyncio to go it. The only change is needed in the following line:

tasks.append(asyncio.create_task(func(i)))

将任务的创建从 asyncio 更改为新创建的 loop ,您可以执行此操作,因为这是您的循环,与 asynio 正在搜索运行中的计算机.

Change the creation of the task from the asyncio to the newly created loop, you are able to do it because this is your loop unlike the asynio which is searching for a running one.

因此,新行应如下所示:

So the new line should look like this:

tasks.append(loop.create_task(func(i)))

另一个解决方案可能正在运行 async 函数并在其中创建任务(例如,由于该循环已经在运行,因此 asyncio 允许将任务附加到该函数):

Another solution could be running an async function and create the tasks there for example (Because that loop is already running now the asyncio enable to attach tasks to it):

import asyncio
async def func(num):
    print('Starting func {0}...'.format(num))
    await asyncio.sleep(0.1)
    print('Ending func {0}...'.format(num))

loop = asyncio.get_event_loop()
async def create_tasks_func():
    tasks = list()
    for i in range(5):
        tasks.append(asyncio.create_task(func(i)))
    await asyncio.wait(tasks)
loop.run_until_complete(create_tasks_func())
loop.close()

此简单更改将导致:

Starting func 0...
Starting func 1...
Starting func 2...
Starting func 3...
Starting func 4...
Ending func 0...
Ending func 2...
Ending func 4...
Ending func 1...
Ending func 3...

这篇关于异步在RuntimeError中:没有正在运行的事件循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆