Python multiprocessing.Pool.map 无声无息地消亡 [英] Python multiprocessing.Pool.map dying silently

查看:77
本文介绍了Python multiprocessing.Pool.map 无声无息地消亡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图并行放置一个 for 循环来加速一些代码.考虑一下:

I have tried to put a for loop in parallel to speed up some code. consider this:

from multiprocessing import Pool

results = []

def do_stuff(str):
    print str
    results.append(str)

p = Pool(4)
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000
p.close()

print results

我从 do_stuff 显示了一些调试消息,以跟踪程序在死亡前的进展情况.它似乎每次都在不同的点死去.例如,它会打印str297",然后它就会停止运行,我会看到所有的 CPU 都停止工作,而程序只是坐在那里.应该发生了一些错误,但没有显示错误消息.有谁知道如何调试这个问题?

I have some debug messages showing from do_stuff to keep track of how far the program gets before dying. It seems to die at different points each time through. For example it will print 'str297' and then it will just stop running, I will see all the CPUs stop working and the program just sits there. Should be some error occuring but there is no error message showing. Does anyone know how to debug this problem?

更新

我尝试重新编写代码.我没有使用 map 函数,而是尝试了 apply_async 函数,如下所示:

I tried re-working the code a little bit. Instead of using the map function I tried the apply_async function like this:

        pool = Pool(5)
        results = pool.map(do_sym, underlyings[0::10])
        results = []
        for sym in underlyings[0::10]:
            r = pool.apply_async(do_sym, [sym])
            results.append(r)

        pool.close()
        pool.join()

        for result in results:
            print result.get(timeout=1000)

这与 map 函数一样有效,但最终以相同的方式挂起.它永远不会进入打印结果的 for 循环.

This worked just as good as the map function, but ended up hanging in the same way. It would never get to the for loop where it prints the results.

在对此进行更多工作并尝试使用 unutbu 的答案中建议的调试日志记录之后,我将在此处提供更多信息.问题很奇怪.似乎游泳池只是挂在那里,无法关闭并继续该程序.我使用 PyDev 环境来测试我的程序,但我想我会尝试在控制台中运行 python.在控制台中,我得到了相同的行为,但是当我按下 control+C 来终止程序时,我得到了一些可能解释问题所在的输出:

After working on this a little more, and trying some debugging logging like was suggested in unutbu's answer, I will give some more info here. The problem is very strange. It seems like the pool is just hanging there and unable to close and continue the program. I use the PyDev environment for testing my programs, but I thought I would try just running python in the console. In the console I get the same behavior, but when I press control+C to kill the program, I get some output which might explain where the problem is:

> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call
> last):   File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call
> last):   File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46:
> Process PoolWorker-44:
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> Traceback (most recent call last): Traceback (most recent call last):
> Traceback (most recent call last):   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap   File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     racquire()
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> KeyboardInterrupt
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     self.run()
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     self._target(*self._args, **self._kwargs)
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     racquire()   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     task = get()
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get  
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     racquire()
>     return recv()
>     racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt

然后实际上程序永远不会消亡.我最终不得不关闭终端窗口才能杀死它.

Then actually the program never dies. I end up having to close the terminal window to kill it.

更新 2

我在池中运行的函数内部缩小了问题的范围,这是导致问题的 MySQL 数据库事务.我之前使用过 MySQLdb 包.我将它切换为事务的 pandas.read_sql 函数,现在它正在工作.

I narrowed down the problem inside the function that is running in the pool, and it was a MySQL database transaction that was causing the problem. I was using the MySQLdb package before. I switched it the a pandas.read_sql function for the transaction, and it is working now.

推荐答案

pool.map 以列表形式返回结果.因此,与其在并发进程中调用 results.append(这将不起作用,因为每个进程都有自己独立的 results 副本),而是分配 results到主进程中pool.map返回的值:

pool.map returns the results in a list. So instead of calling results.append in the concurrent processes (which will not work since each process will have its own independent copy of results), assign results to the value returned by pool.map in the main process:

import multiprocessing as mp

def do_stuff(text):
    return text

if __name__ == '__main__':
    p = mp.Pool(4)
    tasks = ['str{}'.format(i) for i in range(2000)]
    results = p.map(do_stuff, tasks)
    p.close()

    print(results)

收益

['str0', 'str1', 'str2', 'str3', ...]

<小时>

调试使用多处理的脚本的一种方法是添加日志语句.multiprocessing 模块提供了一个辅助函数,mp.log_to_stderr,为此目的.例如,


One method of debugging scripts that use multiprocessing is to add logging statements. The multiprocessing module provides a helper function, mp.log_to_stderr, for this purpose. For example,

import multiprocessing as mp
import logging

logger = mp.log_to_stderr(logging.DEBUG)

def do_stuff(text):
    logger.info('Received {}'.format(text))
    return text

if __name__ == '__main__':
    p = mp.Pool(4)
    tasks = ['str{}'.format(i) for i in range(2000)]
    results = p.map(do_stuff, tasks)
    p.close()

    logger.info(results)

产生如下日志输出:

[DEBUG/MainProcess] created semlock with handle 139824443588608
[DEBUG/MainProcess] created semlock with handle 139824443584512
[DEBUG/MainProcess] created semlock with handle 139824443580416
[DEBUG/MainProcess] created semlock with handle 139824443576320
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-1] Received str0
[INFO/PoolWorker-2] Received str125
[INFO/PoolWorker-3] Received str250
[INFO/PoolWorker-4] Received str375
[INFO/PoolWorker-3] Received str251
...
[INFO/PoolWorker-4] Received str1997
[INFO/PoolWorker-4] Received str1998
[INFO/PoolWorker-4] Received str1999
[DEBUG/MainProcess] closing pool
[INFO/MainProcess] ['str0', 'str1', 'str2', 'str3', ...]
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] task handler exiting
[DEBUG/PoolWorker-3] worker exiting after 2 tasks
[INFO/PoolWorker-3] process shutting down
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] terminating workers
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] cleaning up worker 4811
[DEBUG/MainProcess] running the remaining "atexit" finalizers

请注意,每一行都表示哪个进程发出了日志记录.因此,输出在某种程度上序列化了并发进程中的事件顺序.

Notice that each line indicates which process emitted the logging record. So the output to some extent serializes the order of events from amongst your concurrent processes.

通过明智地放置 logging.info 调用,您应该能够缩小脚本无声死亡"的位置和原因(或者,至少它不会像它死了).

By judicious placement of logging.info calls you should be able to narrow down where and maybe why your script is "dying silently" (or, at least it won't be quite so silent as it dies).

这篇关于Python multiprocessing.Pool.map 无声无息地消亡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆