Python Pathos进程池是否非守护进程? [英] Python pathos Process Pool non-daemonic?

查看:205
本文介绍了Python Pathos进程池是否非守护进程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我如何在python3中而不是在多处理模块中使用pathos实现非守护进程?

How can I implement non-daemonic processes with pathos in python3 instead of with the multiprocessing module?

更具体地说,我指的是: Python进程池非守护程序?

To be more specific, I am referring to: Python Process Pool non-daemonic?

这篇文章的答案通过多处理模块实现了非守护进程.不幸的是,该模块不允许在其他对象中使用lambda函数,但在Python 2中却可以这样做.

The answer to this post implements non-daemonic processes via the multiprocessing module. Unfortunately, this module does not allow to pickle lambda functions among other objects, but pathos does in Python 2:

#import multiprocessing
#import multiprocessing.pool
import pathos

#class NoDaemonProcess(multiprocessing.Process):
class NoDaemonProcess(pathos.multiprocessing.Pool.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

#class NoDaemonPool(multiprocessing.pool.Pool):
class NoDaemonPool(pathos.multiprocessing.Pool):
    Process = NoDaemonProcess

def myproc(args):
    i, max_workers = args
    #pool = multiprocessing.Pool(max_workers)
    pool = pathos.pools.ProcessPool(max_workers)
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    print("myproc", l_args, pool.map(mysubproc, l_args))
    return i

max_workers = [2, 1]
executor = NoDaemonPool(max_workers[0])
#executor = pathos.multiprocessing.Pool(max_workers[0])
l_args = [(i, max_workers[1]) for i in range(10)]
print(executor.map(myproc, l_args))

输出:

('myproc', [], [])
('myproc', [0, 1], [0, 1])
('myproc', [0], [0])
('myproc', [0, 1, 2], [0, 1, 2])
('myproc', [0, 1, 2, 3], [0, 1, 2, 3])
('myproc', [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5])
('myproc', [0, 1, 2, 3, 4], [0, 1, 2, 3, 4])
('myproc', [0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 6])
('myproc', [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7])
('myproc', [0, 1, 2, 3, 4, 5, 6, 7, 8], [0, 1, 2, 3, 4, 5, 6, 7, 8])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

在Python 3中,pathos模块已更改. Python 2,例如 pathos.multiprocessing.Pool.Process 不再是一个类,而是一个函数,因此不能再将其用于继承(请参见上文). -我缺少任何可悲的文档吗?

In Python 3, the pathos module has changed w.r.t. Python 2, e.g., pathos.multiprocessing.Pool.Process is not a class anymore, but a function, so one can no longer use it for inheritance (see above). - any pathos documentation I am missing?

如何使以上代码在Python 3的混乱中正常工作?

How can I make the above code work in pathos in Python 3?

作为上述特定示例的一种变通方法,可以简单地回退到多处理NoDaemonPool实现,然后对守护程序子进程使用pathos:

As a work-around for the above particular example, one can simply fall back to the multiprocessing NoDaemonPool implementation, and use pathos for the daemon sub-processes:

import multiprocessing
import multiprocessing.pool
import pathos

class NoDaemonProcess(multiprocessing.Process):
#class NoDaemonProcess(pathos.multiprocessing.Pool.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(multiprocessing.pool.Pool):
#class NoDaemonPool(pathos.multiprocessing.Pool):
    Process = NoDaemonProcess

def myproc(args):
    i, max_workers = args
    #pool = multiprocessing.Pool(max_workers)
    pool = pathos.pools.ProcessPool(max_workers)
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    print("myproc", l_args, pool.map(mysubproc, l_args))
    return i

max_workers = [2, 1]
executor = NoDaemonPool(max_workers[0])
#executor = pathos.multiprocessing.Pool(max_workers[0])
l_args = [(i, max_workers[1]) for i in range(10)]
print(executor.map(myproc, l_args))

但是,此替代方法不是解决方案,因为 (i)进口病情和多重加工,甚至更重要的是 (ii)无法腌制,例如,如果将myproc定义为

However, this work-around is not a solution because (i) its imports both pathos and multiprocessing, and even more importantly (ii) it won't be able to pickle, e.g., if myproc instead is defined as

myproc = lambda x : x

非常感谢你, 最好, 塞巴斯蒂安

Thank you very much, Best, Sebastian

推荐答案

我刚刚找到了Python 3的答案,即采用了最初的想法

I just found the answer myself for Python 3 taking the original idea from

Python进程池非守护程序?

并按照

如何运行嵌套的,分层的pathos多处理地图?

import pathos
import signal
import sys
import os
import time

# redefine process pool via inheritance
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)

def get_pid_i(x):
    return os.getpid()
def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

def myproc(args):
    i, max_workers = args
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    pool = pathos.pools.ProcessPool(max_workers)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, range(max_workers))
    try:
        l_traj_df = pool.amap(mysubproc, l_args)
        counter_i = 0
        while not l_traj_df.ready():
            time.sleep(1)
            if counter_i % 30 == 0:
                print('Waiting for children running in pool.amap() in myproc( {} ) with PIDs: {}'.format(i, pid_is))
            counter_i += 1
        l_traj_df = l_traj_df.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received in myproc( {} ), attempting to terminate pool...').format(myens)
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {} in myproc( {} )'.format(sys.exc_info()[0], myens))
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise

#myproc = lambda x : x

max_workers = [2, 1]
pool = NoDaemonPool(max_workers[0])
#pool.restart(force=True)
pid_is = pool.map(get_pid_i, range(max_workers[0]))
try:
    results = pool.map_async(myproc, l_args)
    counter_i = 0
    while not results.ready():
        if counter_i % 30 == 0:
            print('Waiting for children running in pool.map_async() in main() with PIDs: {}'.format(pid_is))
        time.sleep(2)
        counter_i += 1
    results = results.get()
    pool.close()
    pool.join()
except KeyboardInterrupt:
    print('Ctrl+C received in main(), attempting to terminate pool...')
    hard_kill_pool(pid_is, pool)  # sending Ctrl+C
    raise
except:
    print('Attempting to close parallel after exception: {} in main()'.format(_sys.exc_info()[0]))
    hard_kill_pool(pid_is, pool)  # sending Ctrl+C
    raise

输出:

Waiting for children running in pool.map_async() in main() with PIDs [15015, 15014]
Waiting for children running in pool.amap() in myproc( 2 ) with PIDs [15019]
Waiting for children running in pool.amap() in myproc( 1 ) with PIDs: [15020]
Waiting for children running in pool.amap() in myproc( 3 ) with PIDs [15021]
Waiting for children running in pool.amap() in myproc( 4 ) with PIDs [15022]
Waiting for children running in pool.amap() in myproc( 6 ) with PIDs [15024]
Waiting for children running in pool.amap() in myproc( 5 ) with PIDs [15023]
Waiting for children running in pool.amap() in myproc( 7 ) with PIDs [15025]
Waiting for children running in pool.amap() in myproc( 8 ) with PIDs [15026]
Waiting for children running in pool.amap() in myproc( 9 ) with PIDs [15028]

我使用了以下模块版本:

I used the following module versions:

python                    3.6.0                         0
pathos                    0.2.1                    py36_1    condo-forge
multiprocess              0.70.4                   py36_0    http://conda.binstar.org/omnia
dill                      0.2.7.1          py36h644ae93_0  
pox                       0.2.3                    py36_0    conda-forge
ppft                      1.6.4.7.1                py36_0    conda-forge
six                       1.10.0                   py36_0  

这篇关于Python Pathos进程池是否非守护进程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆