如何在Python中的类实例中使用多重处理? [英] How to use multiprocessing with class instances in Python?

查看:54
本文介绍了如何在Python中的类实例中使用多重处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个类,该类可以运行一个单独的进程来执行一些需要很长时间的工作,然后从一个主模块中启动一堆,然后等待它们全部完成.我想一次启动这些流程,然后继续为它们提供工作要做,而不是创建和销毁流程.例如,也许我有10台服务器运行dd命令,然后我希望它们全部都封装一个文件,等等.

I am trying to create a class than can run a separate process to go do some work that takes a long time, launch a bunch of these from a main module and then wait for them all to finish. I want to launch the processes once and then keep feeding them things to do rather than creating and destroying processes. For example, maybe I have 10 servers running the dd command, then I want them all to scp a file, etc.

我的最终目标是为每个系统创建一个类,以跟踪与其相关的系统信息,例如IP地址,日志,运行时等.但是该类必须能够启动系统命令然后在该系统命令运行时将执行返回给调用方,以便稍后跟进系统命令的结果.

My ultimate goal is to create a class for each system that keeps track of the information for the system in which it is tied to like IP address, logs, runtime, etc. But that class must be able to launch a system command and then return execution back to the caller while that system command runs, to followup with the result of the system command later.

我的尝试失败了,因为无法通过咸菜将管道上的类的实例方法发送给子进程.那些不是腌制的.因此,我试图以各种方式对其进行修复,但我无法弄清楚.如何为我的代码打补丁以做到这一点?如果您无法发送任何有用的东西,那么多处理有什么好处?

My attempt is failing because I cannot send an instance method of a class over the pipe to the subprocess via pickle. Those are not pickleable. I therefore tried to fix it various ways but I can't figure it out. How can my code be patched to do this? What good is multiprocessing if you can't send over anything useful?

是否有与类实例一起使用的良好的多处理文档?我可以使多处理模块工作的唯一方法是简单的功能.在类实例中使用它的每一次尝试都失败了.也许我应该通过事件吗?我还不知道该怎么做.

Is there any good documentation of multiprocessing being used with class instances? The only way I can get the multiprocessing module to work is on simple functions. Every attempt to use it within a class instance has failed. Maybe I should pass events instead? I don't understand how to do that yet.

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker's commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task's result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

我尝试将ProcessWorker类和多处理队列的创建移到Worker类之外,然后尝试手动对工作程序实例进行酸洗.甚至不起作用,我也会收到错误

I tried to move the ProcessWorker class and the creation of the multiprocessing queues outside of the Worker class and then tried to manually pickle the worker instance. Even that doesn't work and I get an error

RuntimeError:队列对象仅应在进程之间共享 通过继承

RuntimeError: Queue objects should only be shared between processes through inheritance

.但是我只是将这些队列的引用传递到工作程序实例中?我缺少基本的东西.这是主要部分中修改后的代码:

. But I am only passing references of those queues into the worker instance?? I am missing something fundamental. Here is the modified code from the main section:

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")

推荐答案

与其尝试自己发送方法(这是不切实际的),不如尝试发送要执行的方法的 name .

Instead of attempting to send a method itself (which is impractical), try sending a name of a method to execute.

假设每个工作人员都运行相同的代码,这只是一个简单的getattr(self, task_name)问题.

Provided that each worker runs the same code, it's a matter of a simple getattr(self, task_name).

我将传递元组(task_name, task_args),其中task_args是直接用于任务方法的字典:

I'd pass tuples (task_name, task_args), where task_args were a dict to be directly fed to the task method:

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break

这篇关于如何在Python中的类实例中使用多重处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆