一次处理N个工作的Asyncio工人? [英] Asyncio worker that handles N jobs at a time?
本文介绍了一次处理N个工作的Asyncio工人?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
asyncio
工作者类,它将消耗作业队列中的作业,并并行处理多达N
个作业。某些作业可能会对其他作业进行排队。当作业队列为空并且工作进程完成其所有当前作业时,它应该结束。
从概念上讲,我仍在与asyncio
作斗争。以下是我的一个尝试,其中N=3
:
import asyncio, logging, random
async def do_work(id_):
await asyncio.sleep(random.random())
return id_
class JobQueue:
''' Maintains a list of all pendings jobs. '''
def __init__(self):
self._queue = asyncio.Queue()
self._max_id = 10
for id_ in range(self._max_id):
self._queue.put_nowait(id_ + 1)
def add_job(self):
self._max_id += 1
self._queue.put_nowait(self._max_id)
async def get_job(self):
return await self._queue.get()
def has_jobs(self):
return self._queue.qsize() > 0
class JobWorker:
''' Processes up to 3 jobs at a time in parallel. '''
def __init__(self, job_queue):
self._current_jobs = set()
self._job_queue = job_queue
self._semaphore = asyncio.Semaphore(3)
async def run(self):
while self._job_queue.has_jobs() or len(self._current_jobs) > 0:
print('Acquiring semaphore...')
await self._semaphore.acquire()
print('Getting a job...')
job_id = await self._job_queue.get_job()
print('Scheduling job {}'.format(job_id))
self._current_jobs.add(job_id)
task = asyncio.Task(do_work(job_id))
task.add_done_callback(self.task_finished)
def task_finished(self, task):
job_id = task.result()
print('Finished job {} / released semaphore'.format(job_id))
self._current_jobs.remove(job_id)
self._semaphore.release()
if random.random() < 0.2:
print('Queuing a new job')
self._job_queue.add_job()
loop = asyncio.get_event_loop()
jw = JobWorker(JobQueue())
print('Starting event loop')
loop.run_until_complete(jw.run())
print('Event loop ended')
loop.close()
输出摘录:
Starting event loop
Acquiring semaphore...
Getting a job...
Scheduling job 1
Acquiring semaphore...
Getting a job...
Scheduling job 2
Acquiring semaphore...
Getting a job...
Scheduling job 3
Acquiring semaphore...
Finished job 2 / released semaphore
Getting a job...
Scheduling job 4
...snip...
Acquiring semaphore...
Finished job 11 / released semaphore
Getting a job...
Finished job 12 / released semaphore
Finished job 13 / released semaphore
它似乎可以正确处理所有作业,但一次处理的作业不超过3个。但是,程序在最后一个作业完成后挂起。如输出所示,它似乎挂在job_id = await self._job_queue.get_job()
处。一旦作业队列为空,此协同例程将永远不会继续,并且不会再次执行检查作业队列是否为空(在循环的顶部)。
我尝试了许多方法来解决这个问题,但从概念上讲,有些东西不太适合。我当前的WIP是在队列和工作者之间传递一些期货,然后在所有队列和工作者上使用asyncio.wait(...)
的某种组合,但它变得越来越难看,我想知道是否有我忽略的优雅解决方案。
推荐答案
您可以利用queue.task_done表示以前入队的任务已完成。然后可以使用asyncio.wait组合queue.join和queue.get:如果queue.join
完成而queue.get
没有完成,这意味着所有作业都已完成。
请参见此示例:
class Worker:
def __init__(self, func, n=3):
self.func = func
self.queue = asyncio.Queue()
self.semaphore = asyncio.Semaphore(n)
def put(self, *args):
self.queue.put_nowait(args)
async def run(self):
while True:
args = await self._get()
if args is None:
return
asyncio.ensure_future(self._target(args))
async def _get(self):
get_task = asyncio.ensure_future(self.queue.get())
join_task = asyncio.ensure_future(self.queue.join())
await asyncio.wait([get_task, join_task], return_when='FIRST_COMPLETED')
if get_task.done():
return task.result()
async def _target(self, args):
try:
async with self.semaphore:
return await self.func(*args)
finally:
self.queue.task_done()
这篇关于一次处理N个工作的Asyncio工人?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文