如何在异步函数内部并行化for循环并跟踪循环执行状态? [英] How to parallelize the for loop inside a async function and track for loop execution status?

查看:145
本文介绍了如何在异步函数内部并行化for循环并跟踪循环执行状态?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近,我问了一个有关如何跟踪已部署的API中for循环进度的问题.这是链接.

Recently, I have asked a question regarding how to track the progress of a for loop inside a API deployed. Here's the link.

对我有用的解决方案代码

from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid


context = {'jobs': {}}

app = FastAPI()


async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    jobs[job_key]['status'] = 'done'


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

这样,我可以通过调用 status方法

This way, I can track the progress of the for loop inside the do_work using the identifier by calling status method

现在,我正在寻找一种在 do_work 方法中并行化for循环的方法.

Now, I am looking for a way to parallelize the for loop inside do_work method.

但是,如果我使用 joblib ,那么我不知道如何跟踪每个正在处理的文件,那么迭代计数将变得毫无意义,因为所有文件都将被并行处理.

But if I use joblib then I don't know how to track each file being processed, the iteration count will be meaningless because all files will be processed in parallel.

注意:我刚刚举了一个joblib示例,因为我对其他库不是很熟悉.对文件的处理是基于CPU的繁重工作.我正在预处理文件并加载4个tensorflow模型,并在文件上进行预测并写入sql数据库.

Note: I just gave an example with joblib because I am not very familiar with other libraries. The processing on the file is bit heavy cpu based work. I'm preprocessing file and loading 4 tensorflow models and predict it on the file and writing to sql database.

如果有人知道我可以采用的任何方法,请分享并帮助我.

If anyone knows any methods in which I can do it, please share and help me out.

推荐答案

我不是100%确信我理解吗,像这样的作品吗?

Im not 100% sure i understood, would something like this work?

async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['iteration'] = 0

    async def do_work_inner(file):
        # do the work on the file here
        job_info['iteration'] += 1
        await asyncio.sleep(0.5)

    tasks = [do_work_inner(file) for file in iter_over]
    job_info['status'] = 'inprogress'
    await asyncio.gather(*tasks)
    jobs[job_key]['status'] = 'done'

这将并行运行文件上的所有工作*,请记住,在这种情况下,job_info ['iteration']几乎没有意义,因为它们全部一起开始,它们将一起增加价值.

This will run all of the work on the file in parallel*, keep in mind that in this case, job_info['iteration'] is mostly meaningless since all of them start together they will increase the value together.

  • 这是异步并行的,这意味着它不是并行的,但是事件循环会不断地从一个任务跳到另一个任务.

请注意,这是非常重要的,如果与CPU相关的工作(计算,分析等)相对于与IO相关的工作(如Web调用)相对应,那么要对文件执行的实际工作是什么,那么这是错误的解决方案,应进行一些调整,如果是这样,请告诉我,我将尝试对其进行更新.

Pay attention that this is really important what is the actual kind of work you want to perform on the files, if its a cpu-related job (calculations, analysis etc) as opposed to mainly IO related job like web calls, then this is the wrong solution, and should be tweaked a bit, if so let me know and I'll try to update it.

CPU相关工作的更新版本,进度显示文件已完成

updated version for cpu-related work, progress shows files completed

这是一个相对完整的示例,只是没有实际的服务器

This is a relatively complete example, just without the actual server

import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor



jobs = {}
context = {}
executor = ProcessPoolExecutor()


def do_work_per_file(file, file_number):
    # CPU related work here, this method is not async
    # do the work on the file here
    print(f'Starting work on file {file_number}')
    time.sleep(random.randint(1,10) / 10)
    return file_number


async def do_work(job_key, files=None):
    iter_over = files if files else range(15)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['completed'] = 0

    loop = asyncio.get_running_loop()
    tasks = [loop.run_in_executor(executor,do_work_per_file, file, file_number) for file,file_number in enumerate(iter_over)]
    job_info['status'] = 'inprogress'
    for completed_job in asyncio.as_completed(tasks):
        print(f'Finished work on file {await completed_job}')
        job_info['completed'] += 1
        print('Current job status is ', job_info)
        

    jobs[job_key]['status'] = 'done'
    print('Current job status is ', job_info)

if __name__ == '__main__':
    context['jobs'] = jobs
    jobs['abc'] = {}
    asyncio.run(do_work('abc'))

输出为

Starting work on file 0
Starting work on file 1
Starting work on file 2
Starting work on file 3
Starting work on file 4
Starting work on file 5
Starting work on file 6
Starting work on file 7
Starting work on file 8
Starting work on file 9
Starting work on file 10
Starting work on file 11
Starting work on file 12
Starting work on file 13
Starting work on file 14
Finished work on file 1
Current job status is  {'completed': 1, 'status': 'inprogress'}
Finished work on file 7
Current job status is  {'completed': 2, 'status': 'inprogress'}
Finished work on file 9
Current job status is  {'completed': 3, 'status': 'inprogress'}
Finished work on file 12
Current job status is  {'completed': 4, 'status': 'inprogress'}
Finished work on file 11
Current job status is  {'completed': 5, 'status': 'inprogress'}
Finished work on file 13
Current job status is  {'completed': 6, 'status': 'inprogress'}
Finished work on file 4
Current job status is  {'completed': 7, 'status': 'inprogress'}
Finished work on file 14
Current job status is  {'completed': 8, 'status': 'inprogress'}
Finished work on file 0
Current job status is  {'completed': 9, 'status': 'inprogress'}
Finished work on file 6
Current job status is  {'completed': 10, 'status': 'inprogress'}
Finished work on file 2
Current job status is  {'completed': 11, 'status': 'inprogress'}
Finished work on file 3
Current job status is  {'completed': 12, 'status': 'inprogress'}
Finished work on file 8
Current job status is  {'completed': 13, 'status': 'inprogress'}
Finished work on file 5
Current job status is  {'completed': 14, 'status': 'inprogress'}
Finished work on file 10
Current job status is  {'completed': 15, 'status': 'inprogress'}
Current job status is  {'completed': 15, 'status': 'done'}

基本上,现在更改的是,您正在打开一个新的进程池来处理文件上的工作,成为一个新进程也意味着CPU密集型工作不会阻塞您的事件循环,也不会阻止您查询作业的状态.

Basically what changed is now you are opening a new process pool that handles the work on the files, being a new process also means that CPU intensive work will not block your event loop and stop you from querying the status of the job.

这篇关于如何在异步函数内部并行化for循环并跟踪循环执行状态?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆