如何运行嵌套的层次化多层处理映射? [英] How to run nested, hierarchical pathos multiprocessing maps?

查看:118
本文介绍了如何运行嵌套的层次化多层处理映射?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在莳萝序列化/腌制上构建了代码的重要部分,我还尝试使用pathos多处理来并行化我的计算.可悲的是,它是莳萝的自然延伸.

Having build a significant part of my code on dill serialization/pickling, I'm also trying to use pathos multiprocessing to parallelize my calculations. Pathos it is a natural extension of dill.

当尝试运行嵌套时

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

在另一个ProcessingPool().map内,然后我收到:

inside an other ProcessingPool().map, then I receive:

AssertionError: daemonic processes are not allowed to have children

例如:

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

收益

AssertionError: daemonic processes are not allowed to have children

我尝试使用amap(...).get()失败.这是在pathos 0.2.0上.

I tried using amap(...).get() without success. This is on pathos 0.2.0.

允许嵌套并行化的最佳方法是什么?

What is the best way to allow for nested parallelization?

更新

在这一点上,我必须诚实,并承认我已经从悲痛中删除了断言"daemonic processes are not allowed to have children".我还构建了一些东西,将KeyboardInterrupt层叠到那些工人和这些工人的下面的解决方案中:

I have to be honest at this point, and confess that I have removed the assertion "daemonic processes are not allowed to have children" from pathos. I also built something which cascades KeyboardInterrupt to workers and workers of those... Parts of the solution below:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

似乎可以在控制台和IPython笔记本(带有停止按钮)上工作,但不确定在所有极端情况下它是否100%正确.

Seems to work from console and IPython notebook (with stop button), but not sure it's 100% correct in all corner cases.

推荐答案

我遇到了完全相同的问题.就我而言,内部操作是需要并行处理的操作,因此我对ProcessingPool进行了ThreadingPool的处理.这是您的示例:

I encountered exactly the same issue. In my case, The inner operation was the one that needed parallelism so I did a ThreadingPool of a ProcessingPool. Here it is with your example:

from pathos.multiprocessing import ProcessingPool, ThreadingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ThreadingPool().map(refork, xrange(3))

您甚至可以将另一个层与另一个外线程池一起使用.根据您的情况,您可以反转这些池的顺序.但是,您不能具有流程的流程.如果确实需要,请参阅: https://stackoverflow.com/a/8963618/6522112 .我自己还没有尝试过,所以我无法对此进行详细说明.

You can even have another layer with another outer threading pool. Depending on your case, you can invert the order of these pools. However, you cannot have processes of processes. If really needed, see: https://stackoverflow.com/a/8963618/6522112. I haven't try it yet myself so I can't elaborate on this.

这篇关于如何运行嵌套的层次化多层处理映射?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆