如何运行嵌套的、分层的 pathos 多处理映射? [英] How to run nested, hierarchical pathos multiprocessing maps?

查看:20
本文介绍了如何运行嵌套的、分层的 pathos 多处理映射?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 dill 序列化/pickling 上构建了我的代码的重要部分后,我还尝试使用 pathos 多处理来并行化我的计算.悲情它是莳萝的自然延伸.

Having build a significant part of my code on dill serialization/pickling, I'm also trying to use pathos multiprocessing to parallelize my calculations. Pathos it is a natural extension of dill.

尝试运行嵌套时

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

在另一个 ProcessingPool().map 中,然后我收到:

inside an other ProcessingPool().map, then I receive:

AssertionError: daemonic processes are not allowed to have children

例如:

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

收益

AssertionError: daemonic processes are not allowed to have children

我尝试使用 amap(...).get() 没有成功.这是悲情 0.2.0.

I tried using amap(...).get() without success. This is on pathos 0.2.0.

允许嵌套并行化的最佳方法是什么?

What is the best way to allow for nested parallelization?

更新

在这一点上我必须诚实,并承认我已经从悲情中删除了断言不允许守护进程有孩子".我还构建了一些将 KeyboardInterrupt 级联到工人和那些工人的东西......下面的部分解决方案:

I have to be honest at this point, and confess that I have removed the assertion "daemonic processes are not allowed to have children" from pathos. I also built something which cascades KeyboardInterrupt to workers and workers of those... Parts of the solution below:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

似乎可以在控制台和 IPython 笔记本(带有停止按钮)上工作,但不确定在所有极端情况下是否 100% 正确.

Seems to work from console and IPython notebook (with stop button), but not sure it's 100% correct in all corner cases.

推荐答案

我遇到了完全相同的问题.在我的例子中,内部操作是需要并行的,所以我做了一个 ProcessingPoolThreadingPool.这是您的示例:

I encountered exactly the same issue. In my case, The inner operation was the one that needed parallelism so I did a ThreadingPool of a ProcessingPool. Here it is with your example:

from pathos.multiprocessing import ProcessingPool, ThreadingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ThreadingPool().map(refork, xrange(3))

您甚至可以使用另一个外线程池创建另一个层.根据您的情况,您可以颠倒这些池的顺序.但是,您不能拥有多个流程.如果确实需要,请参阅:https://stackoverflow.com/a/8963618/6522112.我自己还没有尝试过,所以我无法详细说明.

You can even have another layer with another outer threading pool. Depending on your case, you can invert the order of these pools. However, you cannot have processes of processes. If really needed, see: https://stackoverflow.com/a/8963618/6522112. I haven't try it yet myself so I can't elaborate on this.

这篇关于如何运行嵌套的、分层的 pathos 多处理映射?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆