python进程池在每个进程上超时而不是所有池 [英] python process pool with timeout on each process not all of the pool

查看:69
本文介绍了python进程池在每个进程上超时而不是所有池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要运行多个进程,但不能同时运行,例如同时运行 4 个进程.multiprocessing.Pool 正是我所需要的.但问题是,如果进程持续时间超过超时(例如 3 秒),我需要终止进程.Pool 只支持等待所有进程的超时,而不是每个进程.这就是我需要的:

I need to run many processes, but not all together, for example 4 processes at same time. multiprocessing.Pool is exactly what I need. But the problem is that I need to terminate a process if it lasts more than a timeout (e.g. 3 seconds). Pool just supports wait for a timeout for all processes not each of them. This is what I need:

def f():
    process_but_kill_if_it_takes_more_than_3_sec()
pool.map(f, inputs)

我找不到一种简单的方法来使用 Pool 超时.有 来自 Eli Bendersky 的解决方案.这是一个通过Thread.join(timeout)限制任意函数执行时间的函数.它有效,(尽管它的停止方法效果不佳).但是这个方法运行一个新的不必要的线程,而进程的主线程只是在等待,因为我们需要一个超时控制器.应该可以从一个点控制所有超时,如下所示:

I couldn't find a simple way to use Pool with timeouts. There is a solution from Eli Bendersky. It's a function that limits execution time of an arbitrary function via Thread.join(timeout). It works, (although it's stop method doesn't work well). But this method runs a new unnecessary thread while main thread of process is just waiting, because we need a timeout controller. It should be possible to control all timeouts from a single point, something like this:

import time
from multiprocessing import Process


def f(n):
    time.sleep(n)

timeout = 3
tasks = [1, 2, 4, 1, 8, 2]

procs = []
pool_len = 4
while len(tasks) > 0 or len(procs) > 0:
    if len(tasks) > 0 and len(procs) < pool_len:
        n = tasks.pop(0)
        p = Process(target=f, args=(n,))
        p.start()
        procs.append({'n': n, 'p': p, 't': time.time() + timeout})
    for d in procs:
        if not d['p'].is_alive():
            procs.remove(d)
            print '%s finished' % d['n']
        elif d['t'] < time.time():
            d['p'].terminate()
            procs.remove(d)
            print '%s killed' % d['n']
    time.sleep(0.05)

输出应该是:

1 finished
1 finished
2 finished
4 killed
2 finished
8 killed

问题:有没有办法使用Pool来解决这个问题?

Question: Is there a way to use Pool to solve this?

推荐答案

你可以让 f(n) 合作,这样它总是在超时内完成(就像在 GUI/网络事件处理程序中一样).

You could make f(n) cooperative so that it always finishes within a timeout (like in GUI/network event handlers).

如果你不能让它合作,那么唯一可靠的选择是杀死正在运行该函数的进程:

If you can't make it cooperative then the only reliable option is to kill the process that is running the function:

import multiprocessing as mp

def run_with_timeout(timeout, func, *args):
    receive_end, send_end = mp.Pipe(duplex=False)
    p = mp.Process(target=func, args=args, kwargs=dict(send_end=send_end))
    p.daemon = True
    p.start()
    send_end.close() # child must be the only one with it opened
    p.join(timeout)
    if p.is_alive():
        ####debug('%s timeout', args)
        p.terminate()
    else:
        return receive_end.recv()  # get value from the child

缺点是每次函数调用都需要一个新进程(maxtasksperchild=1 Pool 的模拟).

The disadvantage is that it requires a new process for each function call (maxtasksperchild=1 Pool's analog).

使用线程池同时运行 4 个进程很容易:

It is easy to run 4 processes at the same time using a thread pool:

#!/usr/bin/env python
import logging
import time
from functools import partial
from multiprocessing.pool import ThreadPool

debug = logging.getLogger(__name__).debug

def run_mp(n, send_end):
    start = time.time()
    debug('%d starting', n)
    try:
        time.sleep(n)
    except Exception as e:
        debug('%d error %s', n, e)
    finally:
        debug('%d done, elapsed: %.3f', n, time.time() - start)
    send_end.send({n: n*n})

if __name__=="__main__":
    tasks = [1, 2, 4, 1, 8, 2]

    logging.basicConfig(format="%(relativeCreated)04d %(message)s", level=logging.DEBUG)
    print(ThreadPool(processes=4).map(partial(run_with_timeout, 3, run_mp), tasks))

输出

0027 1 starting
0028 2 starting
0030 4 starting
0031 1 starting
1029 1 done, elapsed: 1.002
1032 1 done, elapsed: 1.002
1033 8 starting
1036 2 starting
2031 2 done, elapsed: 2.003
3029 (4,) timeout
3038 2 done, elapsed: 2.003
4035 (8,) timeout
[{1: 1}, {2: 4}, None, {1: 1}, None, {2: 4}]

注意:可能存在分叉+线程问题;您可以使用 fork-server 进程来解决它们.

Beware: there could be forking + threading issues; you could use a fork-server process to workaround them.

这篇关于python进程池在每个进程上超时而不是所有池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆