如何添加可用于多处理队列的进程池 [英] How to add a pool of processes available for a multiprocessing queue

查看:93
本文介绍了如何添加可用于多处理队列的进程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在这里关注前面的问题:

I am following a preceding question here: how to add more items to a multiprocessing queue while script in motion

我现在正在使用的代码:

the code I am working with now:

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(proc_name, self.name))


def worker(q):
    while True:
        obj = q.get()
        if obj is None:
            break
        obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))
    queue.put(MyFancyClass('Frankie'))
    # print(queue.qsize())
    queue.put(None)

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

现在,队列中有两个项目.如果我将两行替换为例如50个项目的清单....如何启动POOL以允许许多可用的过程.例如:

Right now, there's two items in the queue. if I replace the two lines with a list of, say 50 items....How do I initiate a POOL to allow a number of processes available. for example:

p = multiprocessing.Pool(processes=4)

那去哪儿了?我希望能够一次运行多个项目,特别是如果项目运行了一段时间. 谢谢!

where does that go? I'd like to be able run multiple items at once, especially if the items run for a bit. Thanks!

推荐答案

通常,您要么使用Pool Process(es)加Queue s.两者混用是一种误用. Pool已经在后台使用了Queue(或类似的机制).

As a rule, you either use Pool or Process(es) plus Queues. Mixing both is a misuse; the Pool already uses Queues (or a similar mechanism) behind the scenes.

如果要使用Pool进行此操作,请将代码更改为(将代码移至main函数以实现性能并比在全局范围内运行更好地清理资源):

If you want to do this with a Pool, change your code to (moving code to main function for performance and better resource cleanup than running in global scope):

def main():
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances here
    with multiprocessing.Pool(processes=4) as p:
        # Submit all the work
        futures = [p.apply_async(fancy.do_something) for fancy in myfancyclasses]

        # Done submitting, let workers exit as they run out of work
        p.close()

        # Wait until all the work is finished
        for f in futures:
            f.wait()

if __name__ == '__main__':
    main()

使用Pool.*map*方法,例如以纯度为代价,可以进一步简化此过程.为了最大程度地减少内存使用,将main重新定义为:

This could be simplified further at the expense of purity, with the .*map* methods of Pool, e.g. to minimize memory usage redefine main as:

def main():
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances here
    with multiprocessing.Pool(processes=4) as p:
        # No return value, so we ignore it, but we need to run out the result
        # or the work won't be done
        for _ in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
            pass

是的,从技术上讲,这两种方法在需要序列化未使用的返回值方面的开销都会稍高一些,因此请将其返回给父进程.但是在实践中,此开销非常低(因为您的函数没有return,因此返回的是None,几乎不会序列化为任何东西).这种方法的优点是,要打印到屏幕上,通常不想要从子进程中进行打印(因为它们最终将交错输出),因此您可以替换printreturn一起让父母来做工作,例如:

Yes, technically either approach has a slightly higher overhead in terms of needing to serialize the return value you're not using so give it back to the parent process. But in practice, this cost is pretty low (since your function has no return, it's returning None, which serializes to almost nothing). An advantage to this approach is that for printing to the screen, you generally don't want to do it from the child processes (since they'll end up interleaving output), and you can replace the printing with returns to let the parent do the work, e.g.:

import multiprocessing

class MyFancyClass:
    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        # Changed from print to return
        return 'Doing something fancy in {} for {}!'.format(proc_name, self.name)

def main():
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your MyFancyClass instances here
    with multiprocessing.Pool(processes=4) as p:
        # Using the return value now to avoid interleaved output
        for res in p.imap_unordered(MyFancyClass.do_something, myfancyclasses):
            print(res)

if __name__ == '__main__':
    main()

请注意,所有这些解决方案如何消除编写自己的worker函数或手动管理Queue的需要,因为Pool会帮您忙.

Note how all of these solutions remove the need to write your own worker function, or manually manage Queues, because Pools do that grunt work for you.

使用concurrent.futures的替代方法可以在结果可用时有效地对其进行处理,同时允许您选择随行提交新工作(基于结果或基于外部信息):

Alternate approach using concurrent.futures to efficiently process results as they become available, while allowing you to choose to submit new work (either based on the results, or based on external information) as you go:

import concurrent.futures

from concurrent.futures import FIRST_COMPLETED

def main():
    allow_new_work = True  # Set to False to indicate we'll no longer allow new work
    myfancyclasses = [MyFancyClass('Fancy Dan'), ...] # define your initial MyFancyClass instances here
    with concurrent.futures.ProcessPoolExecutor() as executor:
        remaining_futures = {executor.submit(fancy.do_something)
                             for fancy in myfancyclasses}
        while remaining_futures:
            done, remaining_futures = concurrent.futures.wait(remaining_futures,
                                                              return_when=FIRST_COMPLETED)
            for fut in done:
                result = fut.result()
                # Do stuff with result, maybe submit new work in response

            if allow_new_work:
                if should_stop_checking_for_new_work():
                    allow_new_work = False
                    # Let the workers exit when all remaining tasks done,
                    # and reject submitting more work from now on
                    executor.shutdown(wait=False)
                elif has_more_work():
                    # Assumed to return collection of new MyFancyClass instances
                    new_fanciness = get_more_fanciness()
                    remaining_futures |= {executor.submit(fancy.do_something)
                                          for fancy in new_fanciness}
                    myfancyclasses.extend(new_fanciness)

这篇关于如何添加可用于多处理队列的进程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆