多处理-在不破坏池的情况下取消池中的剩余作业 [英] multiprocessing - Cancel remaining jobs in a pool without destroying the Pool

查看:81
本文介绍了多处理-在不破坏池的情况下取消池中的剩余作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用map_async创建一个包含4个工作程序的池.并为其提供要处理的图像文件列表[Set 1].
有时,我需要取消两者之间的处理,以便我可以获取另一组已处理的文件[Set 2].

I'm using map_async to create a pool of 4 workers. And giving it a list of image files to process [Set 1].
At times, I need to cancel the processing in between, so that I can instead get a different set of files processed [Set 2].

所以一个例子是,我给map_async 1000个文件进行处理.然后要在处理了大约200个文件之后取消对剩余作业的处理.
另外,我想执行此取消操作而不破坏/终止池.这可能吗?

So an example situation is, I gave map_async 1000 files to process. And then want to cancel the processing of remaining jobs after about 200 files have been processed.
Additionally, I want to do this cancellation without destroying/terminating the pool. Is this possible?

我不想终止池,因为在Windows上重新创建池是一个缓慢的过程(因为它使用的是"spawn"而不是"fork").而且我需要使用同一池来处理一组不同的图像文件[Set 2].

I do not want to terminate the pool, because recreating the pool is a slow process on Windows (because it uses 'spawn', instead of 'fork'). And I need to use this same pool for processing a different set of image files [Set 2]..

# Putting job_set1 through processing. It may consist of 1000 images
cpu = multiprocessing.cpu_count()
pool = Pool(processes=cpu)
result = pool.map_async(job_set1, thumb_ts_list, chunksize=chunksize)

现在之间,我需要取消对此集合1的处理,然后移至其他集合(不选择等待所有1000张图像完成处理,但是我可以等待正在处理的当前图像完成) )

Now in between, I need to cancel the processing on this set 1. And move onto a different set (waiting for all 1000 images to complete processing is not an option, but I can wait for the current image being processed to finish)

<Somehow cancel processing of job_set1>
result = pool.map_async(job_set2, thumb_ts_list, chunksize=chunksize)

推荐答案

该是基本定理的时候了软件工程:虽然multiprocessing.Pool不提供取消功能,但我们可以通过从精心制作的迭代器中读取Pool来添加它.但是,仅生成yield列表中的值但在某些信号上停止显示的生成器是不够的,因为Pool会急切地耗尽分配给它的任何生成器.因此,我们需要一个非常精心设计的可迭代对象.

It's time for the fundamental theorem of software engineering: while multiprocessing.Pool doesn't supply cancellation as a feature, we can add it by having a Pool read from a carefully crafted iterable. It's not enough, however, to have a generator that yields values from a list but stops short on some signal, because the Pool eagerly drains any generator given to it. So we need a very carefully crafted iterable.

我们需要的通用工具是一种仅当有工作人员可用时(或最多要完成一项任务,以防构造它们花费大量时间)为Pool构造任务的方法.基本思想是通过仅在任务完成时增加信号量来减慢Pool的线程收集工作. (我们从imap_unordered的可观察到的行为知道存在这样的线程.)

The generic tool we need is a way to construct tasks for a Pool only when a worker becomes available (or at most one task ahead, in case constructing them takes significant time). The basic idea is to slow down the thread collecting work for the Pool with a semaphore upped only when a task is finished. (We know such a thread exists from the observable behavior of imap_unordered.)

import multiprocessing
from threading import Semaphore

size=multiprocessing.cpu_count()  # or whatever Pool size to use

# How many workers are waiting for work?  Add one to buffer one task.
work=Semaphore(size)

def feed0(it):
  it=iter(it)
  try:
    while True:
      # Don't ask the iterable until we have a customer, in case better
      # instructions become available:
      work.acquire()
      yield next(it)
  except StopIteration: pass
  work.release()
def feed(p,f,it):
  import sys,traceback
  iu=p.imap_unordered(f,feed0(it))
  while True:
    try: x=next(iu)
    except StopIteration: return
    except Exception: traceback.print_exception(*sys.exc_info())
    work.release()
    yield x

feed中的try可以防止子级的失败破坏信号量的计数,但是请注意,它不能防止父级的失败.

The try in feed prevents failures in the children from breaking the semaphore's count, but note that it does not protect against failures in the parent.

现在,我们可以实时控制Pool输入,从而使任何调度策略都变得简单明了.例如,类似于itertools.chain的东西,但具有异步丢弃输入序列之一中任何剩余元素的能力:

Now that we have real-time control over the Pool input, making whatever scheduling policy is straightforward. For example, here's something like itertools.chain but with the ability to asynchronously discard any remaining elements from one of the input sequences:

import collections,queue

class Cancel:
  closed=False
  cur=()
  def __init__(self): self.data=queue.Queue() # of deques
  def add(self,d):
    d=collections.deque(d)
    self.data.put(d)
    return d
  def __iter__(self):
    while True:
      try: yield self.cur.popleft()
      except IndexError:
        self.cur=self.data.get()
        if self.cur is None: break
  @staticmethod
  def cancel(d): d.clear()
  def close(self): self.data.put(None)

尽管没有锁定,但它还是线程安全的(至少在CPython中),因为deque.clear之类的操作对于Python检查而言是原子的(并且我们不会分别检查self.cur是否为空).

This is thread-safe (in CPython at least) despite the lack of locking because operations like deque.clear are atomic with respect to Python inspection (and we don't separately check whether self.cur is empty).

使其中一种看起来像

pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
  if happy_with(x): can.cancel(many)  # straight onto few, then out

当然add s和close本身可能在循环中.

where of course the adds and close could themselves be in the loop.

这篇关于多处理-在不破坏池的情况下取消池中的剩余作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆