一次处理N个工作的Asyncio工人? [英] Asyncio worker that handles N jobs at a time?

查看:16
本文介绍了一次处理N个工作的Asyncio工人?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个asyncio工作者类,它将消耗作业队列中的作业,并并行处理多达N个作业。某些作业可能会对其他作业进行排队。当作业队列为空并且工作进程完成其所有当前作业时,它应该结束。

从概念上讲,我仍在与asyncio作斗争。以下是我的一个尝试,其中N=3

import asyncio, logging, random

async def do_work(id_):
    await asyncio.sleep(random.random())
    return id_

class JobQueue:
    ''' Maintains a list of all pendings jobs. '''
    def __init__(self):
        self._queue = asyncio.Queue()
        self._max_id = 10
        for id_ in range(self._max_id):
            self._queue.put_nowait(id_ + 1)

    def add_job(self):
        self._max_id += 1
        self._queue.put_nowait(self._max_id)

    async def get_job(self):
        return await self._queue.get()

    def has_jobs(self):
        return self._queue.qsize() > 0

class JobWorker:
    ''' Processes up to 3 jobs at a time in parallel. '''
    def __init__(self, job_queue):
        self._current_jobs = set()
        self._job_queue = job_queue
        self._semaphore = asyncio.Semaphore(3)

    async def run(self):
        while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
            print('Acquiring semaphore...')
            await self._semaphore.acquire()
            print('Getting a job...')
            job_id = await self._job_queue.get_job()
            print('Scheduling job {}'.format(job_id))
            self._current_jobs.add(job_id)
            task = asyncio.Task(do_work(job_id))
            task.add_done_callback(self.task_finished)

    def task_finished(self, task):
        job_id = task.result()
        print('Finished job {} / released semaphore'.format(job_id))
        self._current_jobs.remove(job_id)
        self._semaphore.release()
        if random.random() < 0.2:
            print('Queuing a new job')
            self._job_queue.add_job()

loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()

输出摘录:

Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2 / released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11 / released semaphore
Getting a job...
Finished job 12 / released semaphore
Finished job 13 / released semaphore
它似乎可以正确处理所有作业,但一次处理的作业不超过3个。但是,程序在最后一个作业完成后挂起。如输出所示,它似乎挂在job_id = await self._job_queue.get_job()处。一旦作业队列为空,此协同例程将永远不会继续,并且不会再次执行检查作业队列是否为空(在循环的顶部)。

我尝试了许多方法来解决这个问题,但从概念上讲,有些东西不太适合。我当前的WIP是在队列和工作者之间传递一些期货,然后在所有队列和工作者上使用asyncio.wait(...)的某种组合,但它变得越来越难看,我想知道是否有我忽略的优雅解决方案。

推荐答案

您可以利用queue.task_done表示以前入队的任务已完成。然后可以使用asyncio.wait组合queue.joinqueue.get:如果queue.join完成而queue.get没有完成,这意味着所有作业都已完成。

请参见此示例:

class Worker:

    def __init__(self, func, n=3):
        self.func = func
        self.queue = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(n)

    def put(self, *args):
        self.queue.put_nowait(args)

    async def run(self):
        while True:
            args = await self._get()
            if args is None:
                return
            asyncio.ensure_future(self._target(args))

    async def _get(self):
        get_task = asyncio.ensure_future(self.queue.get())
        join_task = asyncio.ensure_future(self.queue.join())
        await asyncio.wait([get_task, join_task], return_when='FIRST_COMPLETED')
        if get_task.done():
            return task.result()

    async def _target(self, args):
        try:
            async with self.semaphore:
                return await self.func(*args)
        finally:
            self.queue.task_done()

这篇关于一次处理N个工作的Asyncio工人?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆