在类方法 Python 中调用多处理 [英] call multiprocessing in class method Python

查看:26
本文介绍了在类方法 Python 中调用多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最初,我有一个类来存储一些处理过的值,并在其他方法中重新使用这些值.

问题是当我试图将类方法划分为多个进程以加快速度时,python 产生了进程,但它似乎不起作用(正如我在任务管理器中看到的,只有 1 个进程正在运行)并且结果从未交付.

我做了几次搜索,发现 pathos.multiprocessing 可以做到这一点,但我想知道标准库是否可以解决这个问题?

from multiprocessing import PoolA类():def __init__(self, vl):self.vl = vldef cal(self, nb):返回 nb * self.vldef运行(自我,dt):t = 池(进程数=4)rs = t.map(self.cal, dt)t.close()返回a = A(2)a.run(list(range(10)))

解决方案

您的代码失败,因为它无法pickle 实例方法 (self.cal),这就是当您通过将多个进程映射到 multiprocessing.Pool 来生成多个进程时,Python 会尝试这样做(好吧,有一种方法可以做到,但它太复杂了,无论如何也不是非常有用)-因为有没有共享内存访问,它必须打包"数据并将其发送到生成的进程进行解包.如果您尝试腌制 a 实例,也会发生同样的情况.

multiprocessing 包中唯一可用的共享内存访问是一个鲜为人知的multiprocessing.pool.ThreadPool 所以如果你真的想这样做:

from multiprocessing.pool import ThreadPoolA类():def __init__(self, vl):self.vl = vldef cal(self, nb):返回 nb * self.vldef运行(自我,dt):t = 线程池(进程数 = 4)rs = t.map(self.cal, dt)t.close()返回 rsa = A(2)打印(a.run(列表(范围(10))))# 打印:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

但这不会为您提供并行化,因为它本质上映射到您可以访问共享内存的常规线程.您应该传递类/静态方法(如果您需要调用它们)以及您希望它们使用的数据(在您的情况下为 self.vl).如果您需要跨进程共享该数据,则必须使用一些共享内存抽象,例如 multiprocessing.Value,当然要沿途应用互斥锁.

更新

我说你可以做到(有些模块或多或少都在这样做,例如检查 pathos.multiprocessing)但我认为不值得麻烦 - 当你来的时候到了你必须欺骗你的系统做你想做的事情的地步,很可能你要么使用了错误的系统,要么你应该重新考虑你的设计.但为了知情,这里有一种方法可以在多处理设置中执行您想要的操作:

导入系统从多处理导入池def parallel_call(params): # 一个调用远程"实例的助手cls = getattr(sys.modules[__name__], params[0]) # 获取我们的类类型instance = cls.__new__(cls) # 创建一个新实例而不调用 __init__instance.__dict__ = params[1] # 将传递的状态应用到新实例method = getattr(instance, params[2]) # 获取请求的方法args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]return method(*args) # 展开参数,调用我们的方法并返回结果A类(对象):def __init__(self, vl):self.vl = vldef cal(self, nb):返回 nb * self.vldef运行(自我,dt):t = 池(进程数=4)rs = t.map(parallel_call, self.prepare_call("cal", dt))t.close()返回 rsdef prepare_call(self, name, args): # 为每个参数创建一个远程调用"包对于 args 中的 arg:产量 [self.__class__.__name__, self.__dict__, name, arg]if __name__ == "__main__": # 跨平台使用的重要保护a = A(2)打印(a.run(列表(范围(10))))# 打印:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

我认为它是如何工作的非常不言自明,但简而言之,它将您的类的名称、其当前状态(无信号、tho)、要调用的所需方法和调用它的参数传递给 parallel_call 函数,为 Pool 中的每个进程调用.Python 会自动pickle 和unpickles 所有这些数据,因此parallel_call 需要做的就是重建原始对象,在其中找到所需的方法并使用提供的参数调用它.

这样我们只传递数据而不尝试传递活动对象,所以 Python 不会抱怨(好吧,在这种情况下,尝试向类参数添加对实例方法的引用,看看会发生什么)和一切工作得很好.

如果你想深入了解魔法",你可以让它看起来和你的代码完全一样(创建你自己的 Pool 处理程序,从函数中获取名称并将名称发送到实际进程等),但这应该为您的示例提供足够的功能.

但是,在您抱有希望之前,请记住,这仅在共享静态"实例(一旦您开始在多处理上下文中调用它就不会更改其初始状态的实例)时才有效.如果 A.cal 方法要改变 vl 属性的内部状态 - 它只会影响它改变的实例(除非它在调用的主实例中改变调用之间的 Pool).如果你也想共享状态,你可以升级parallel_call在调用后拾取instance.__dict__并与方法调用结果一起返回,然后在调用方您必须使用返回的数据更新本地 __dict__ 以更改原始状态.但这还不够——您实际上必须创建一个共享的 dict 并处理所有互斥量,以便所有进程同时访问它(您可以为此使用 multiprocessing.Manager).>

所以,正如我所说的,麻烦多过它的价值......

Initially, I have a class to store some processed values and re-use those with its other methods.

The problem is when i tried to divide the class method into multiple process to speed up, python spawned processes but it seems didn't work (as I saw in Task Manager that only 1 process was running) and result is never delivered.

I did couple of search and found that pathos.multiprocessing can do this instead but I wonder if standard library can solve this problems?

from multiprocessing import Pool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return t

a = A(2)

a.run(list(range(10)))

解决方案

Your code fails as it cannot pickle the instance method (self.cal), which is what Python attempts to do when you're spawning multiple processes by mapping them to multiprocessing.Pool (well, there is a way to do it, but it's way too convoluted and not extremely useful anyway) - since there is no shared memory access it has to 'pack' the data and send it to the spawned process for unpacking. The same would happen to you if you tried to pickle the a instance.

The only shared memory access available in the multiprocessing package is a little known multiprocessing.pool.ThreadPool so if you really want to do this:

from multiprocessing.pool import ThreadPool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = ThreadPool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return rs

a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

But this will not give you parallelization as it essentially maps to your regular threads which do have access to the shared memory. You should pass class/static methods instead (if you need them called) accompanied with the data you want them to work with (in your case self.vl). If you need to share that data across processes you'll have to use some shared memory abstraction, like multiprocessing.Value, applying mutex along the way of course.

UPDATE

I said you could do it (and there are modules that more or less are doing it, check pathos.multiprocessing for example) but I don't think it's worth the trouble - when you come to a point where you have to trick your system into doing what you want, chances are you're either using a wrong system or you should rethink your design. But for the sake of informedness, here is one way to do what you want in a multiprocessing setting:

import sys
from multiprocessing import Pool

def parallel_call(params):  # a helper for calling 'remote' instances
    cls = getattr(sys.modules[__name__], params[0])  # get our class type
    instance = cls.__new__(cls)  # create a new instance without invoking __init__
    instance.__dict__ = params[1]  # apply the passed state to the new instance
    method = getattr(instance, params[2])  # get the requested method
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
    return method(*args)  # expand arguments, call our method and return the result

class A(object):

    def __init__(self, vl):
        self.vl = vl

    def cal(self, nb):
        return nb * self.vl

    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(parallel_call, self.prepare_call("cal", dt))
        t.close()
        return rs

    def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
        for arg in args:
            yield [self.__class__.__name__, self.__dict__, name, arg]

if __name__ == "__main__":  # important protection for cross-platform use
    a = A(2)
    print(a.run(list(range(10))))
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

I think it's pretty self explanatory how it works, but in short it passes the name of your class, its current state (sans signals, tho), a desired method to be called and arguments to invoke it with to a parallel_call function which is called for each process in the Pool. Python automatically pickles and unpickles all this data so all parallel_call needs to do is reconstruct the original object, find a desired method in it and call it with the provided param(s).

This way we're passing only the data without trying to pass active objects so Python doesn't complain (well, in this case, try adding a reference to a instance method to your class parameters and see what happens) and everything works just fine.

If you want to go heavy on the 'magic' you can make it look exactly like your code (create your own Pool handler, pick up names from the functions and send the names to actual processes, etc.) but this should serve a sufficient function for your example.

However, before you raise your hopes up, keep in mind that this will work only when sharing a 'static' instance (an instance that doesn't change its initial state once you start invoking it in a multiprocessing context). If the A.cal method is to change the internal state of the vl property - it would affect only the instance where it changes (unless it changes in the main instance that calls the Pool between calls). If you want to share the state as well, you can upgrade parallel_call to pick up instance.__dict__ after the call and return it together with the method call result, then on the calling side you'd have to update the local __dict__ with the returned data to change the original state. And that's not enough - you'd actually have to create a shared dict and handle all the mutex staff to have it concurrently accessed by all the processes (you can use multiprocessing.Manager for that).

So, as I was saying, more trouble than its worth...

这篇关于在类方法 Python 中调用多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆