python 子类化 multiprocessing.Process [英] python subclassing multiprocessing.Process

查看:103
本文介绍了python 子类化 multiprocessing.Process的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Python 面向对象的新手,我正在将我现有的应用程序重写为面向对象的版本,因为现在开发人员越来越多,我的代码变得不可维护.

通常我使用多处理队列,但我从这个例子中发现 http://www.doughellmann.com/PyMOTW/multiprocessing/basics.html 我可以子类化 multiprocessing.Process 所以我认为这是一个好主意,我写了一个类来测试这样的:

代码:

from multiprocessing import Process类处理器(进程):def return_name(self):返回进​​程 %s"% self.name定义运行(自我):返回 self.return_name()进程 = []如果 __name__ == "__main__":对于范围内的 i (0,5):p=处理器()进程.append(p)p.start()对于进程中的 p:p.join()

但是我无法取回值,如何以这种方式使用队列?

我想获取返回值并考虑将 Queues() 放在哪里.

解决方案

Subclassing multiprocessing.Process:

<块引用>

但是我无法取回值,如何以这种方式使用队列?

Process 需要一个 Queue() 来接收结果... 下面是如何子类 multiprocessing.Process 的示例...

from multiprocessing import Process, Queue类处理器(进程):def __init__(self, queue, idx, **kwargs):超级(处理器,自我).__init__()self.queue = 队列self.idx = idxself.kwargs = kwargs定义运行(自我):"""在这里构建一些 CPU 密集型任务以通过多处理运行."""hash(self.kwargs) # 无耻地使用 CPU 无利可图...##通过multiprocessing.Queue返回一些信息## 注意:self.name 是 multiprocessing.Process 的一个属性self.queue.put("进程 idx={0} 被称为 '{1}'".format(self.idx, self.name))如果 __name__ == "__main__":NUMBER_OF_PROCESSES = 5## 创建一个列表来保存正在运行的处理器对象实例...进程=列表()q = Queue() # 建立一个队列发送给所有进程对象...对于 i 在范围内(0,NUMBER_OF_PROCESSES):p=处理器(队列=q,idx=i)p.start()进程.append(p)# 结合这个答案的想法,如下...# https://stackoverflow.com/a/42137966/667301[proc.join() 用于进程中的 proc]而不是 q.empty():print "RESULT: {0}".format(q.get()) # 从队列中获取结果...

在我的机器上,这导致...

$ python test.py结果:进程 idx=0 被称为Processor-1"结果:进程 idx=4 被称为Processor-5"结果:进程 idx=3 被称为Processor-4"结果:进程 idx=1 被称为Processor-2"结果:进程 idx=2 被称为Processor-3"$


使用multiprocessing.Pool:

FWIW,我发现子类化 multiprocessing.Process 的一个缺点是你不能利用 multiprocessing.Pool 的所有内置优点;Pool 为您提供了一个非常好的 API,如果您需要您的生产者和消费者代码通过队列相互通信.

你可以用一些创造性的返回值做很多事情……在下面的例子中,我使用一个 dict() 来封装来自 pool_job() 的输入和输出值代码>...

from multiprocessing import Pooldef pool_job(input_val=0):# 仅供参考, multiprocessing.Pool 不能保证它保持输入正确排序# dict 格式为 {input: output}...return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}pool = Pool(5) # 使用 5 个多处理进程来处理作业...results = pool.map(pool_job, xrange(0, 12)) # 将 xrange(0, 12) 映射到 pool_job()打印结果

这导致:

<预><代码>[{'pool_job(input_val=0)': 0},{'pool_job(input_val=1)': 12},{'pool_job(input_val=2)': 24},{'pool_job(input_val=3)': 36},{'pool_job(input_val=4)': 48},{'pool_job(input_val=5)': 60},{'pool_job(input_val=6)': 72},{'pool_job(input_val=7)': 84},{'pool_job(input_val=8)': 96},{'pool_job(input_val=9)': 108},{'pool_job(input_val=10)': 120},{'pool_job(input_val=11)': 132}]

显然,pool_job() 中还有许多其他改进,例如错误处理,但这说明了要点.仅供参考,这个答案 提供了如何使用 multiprocessing.Pool 的另一个示例.

I am new to python object oriented and I am rewriting my existing application as an object oriented version, because now developers are increasing and my code is becoming un-maintainable.

Normally I use multiprocessing queues but I found from this example http://www.doughellmann.com/PyMOTW/multiprocessing/basics.html that I can subclass multiprocessing.Process so I think it's a good idea and I wrote a class to test like this:

code:

from multiprocessing import Process
class Processor(Process):
    def return_name(self):
        return "Process %s" % self.name
    def run(self):
        return self.return_name()

processes = []


if __name__ == "__main__":

        for i in range(0,5):
                p=Processor()
                processes.append(p)
                p.start()
        for p in processes:
                p.join()

However I cannot get back the values, how can I use queues in this way?

EDIT: I want to get the return value and thinking where to put Queues().

解决方案

Subclassing multiprocessing.Process:

However I cannot get back the values, how can I use queues in this way?

Process needs a Queue() to receive the results... An example of how to subclass multiprocessing.Process follows...

from multiprocessing import Process, Queue
class Processor(Process):

    def __init__(self, queue, idx, **kwargs):
        super(Processor, self).__init__()
        self.queue = queue
        self.idx = idx
        self.kwargs = kwargs

    def run(self):
        """Build some CPU-intensive tasks to run via multiprocessing here."""
        hash(self.kwargs) # Shameless usage of CPU for no gain...

        ## Return some information back through multiprocessing.Queue
        ## NOTE: self.name is an attribute of multiprocessing.Process
        self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name))

if __name__ == "__main__":
    NUMBER_OF_PROCESSES = 5

    ## Create a list to hold running Processor object instances...
    processes = list()

    q = Queue()  # Build a single queue to send to all process objects...
    for i in range(0, NUMBER_OF_PROCESSES):
        p=Processor(queue=q, idx=i)
        p.start()
        processes.append(p)

    # Incorporating ideas from this answer, below...
    #    https://stackoverflow.com/a/42137966/667301
    [proc.join() for proc in processes]
    while not q.empty():
        print "RESULT: {0}".format(q.get())   # get results from the queue...

On my machine, this results in...

$ python test.py
RESULT: Process idx=0 is called 'Processor-1'
RESULT: Process idx=4 is called 'Processor-5'
RESULT: Process idx=3 is called 'Processor-4'
RESULT: Process idx=1 is called 'Processor-2'
RESULT: Process idx=2 is called 'Processor-3'
$


Using multiprocessing.Pool:

FWIW, one disadvantage I've found to subclassing multiprocessing.Process is that you can't leverage all the built-in goodness of multiprocessing.Pool; Pool gives you a very nice API if you don't need your producer and consumer code to talk to each other through a queue.

You can do a lot just with some creative return values... in the following example, I use a dict() to encapsulate input and output values from pool_job()...

from multiprocessing import Pool

def pool_job(input_val=0):
    # FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly
    # dict format is {input: output}...
    return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}

pool = Pool(5)  # Use 5 multiprocessing processes to handle jobs...
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
print results

This results in:

[
    {'pool_job(input_val=0)': 0}, 
    {'pool_job(input_val=1)': 12}, 
    {'pool_job(input_val=2)': 24}, 
    {'pool_job(input_val=3)': 36}, 
    {'pool_job(input_val=4)': 48}, 
    {'pool_job(input_val=5)': 60}, 
    {'pool_job(input_val=6)': 72}, 
    {'pool_job(input_val=7)': 84}, 
    {'pool_job(input_val=8)': 96}, 
    {'pool_job(input_val=9)': 108}, 
    {'pool_job(input_val=10)': 120}, 
    {'pool_job(input_val=11)': 132}
]

Obviously there are plenty of other improvements to be made in pool_job(), such as error handling, but this illustrates the essentials. FYI, this answer provides another example of how to use multiprocessing.Pool.

这篇关于python 子类化 multiprocessing.Process的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆