一旦其中一个工人满足特定条件,终止Python多处理程序 [英] Terminate a Python multiprocessing program once a one of its workers meets a certain condition

查看:188
本文介绍了一旦其中一个工人满足特定条件,终止Python多处理程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用其多处理模块编写Python程序.该程序调用许多辅助函数,每个辅助函数产生一个随机数. 一旦其中一名工人生产的数值大于0.7,我需要终止该程序.

I am writing a Python program using its multiprocessing module. The program calls a number of worker functions, each yielding a random number. I need to terminate the program once one of the workers has produced a number larger than 0.7.

下面是我的程序,其中"操作方法"部分尚未填写.任何的想法?谢谢.

Below is my program where the "how to do this" part is not yet filled out. Any idea? Thanks.

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    print "From i = ",i, "       res = ",res
    if res>0.7:
        print "find it"
        # terminate  ???? Question: How to do this???


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

推荐答案

没有任何进程可以阻止像os.kill()这样的蛮力大锤.不要去那里.

No process can stop another short of brute force os.kill()-like sledgehammers. Don't go there.

要明智地执行此操作,您需要重新设计基本方法:主流程和工作流程需要相互通信.

To do this sanely, you need to rework your basic approach: the main process and the worker processes need to communicate with each other.

我会充实它,但是到目前为止,示例太简单了,以至于无法使用.例如,按照书面规定,对rand()的调用不会超过num_workers个,因此没有理由相信其中任何一个必须大于0.7.

I'd flesh it out, but the example so far is too bare-bones to make it useful. For example, as written, no more than num_workers calls to rand() are ever made, so there's no reason to believe any of them must be > 0.7.

一旦worker函数增长了一个循环,它就会变得更加明显.例如,工作人员可以检查循环顶部是否设置了mp.Event,如果已设置则直接退出.当希望工人停止时,主要过程将设置Event.

Once the worker function grows a loop, then it becomes more obvious. For example, the worker could check to see if an mp.Event is set at the top of the loop, and just exit if it is. The main process would set the Event when it wants the workers to stop.

,当工人发现值> 0.7时,可以设置其他mp.Event.主要过程将等待Event,然后设置停止时间" Event供工作人员查看,然后执行通常的循环.join()-使工作人员彻底关闭.

And a worker could set a different mp.Event when it found a value > 0.7. The main process would wait for that Event, then set the "time to stop" Event for workers to see, then do the usual loop .join()-ing the workers for a clean shutdown.

在这里充实了一种便携式,干净的解决方案,假设工人将继续努力,直到至少一个工人发现> 0.7的值为止.请注意,我从中删除了numpy,因为它与这段代码无关.此处的代码在任何支持multiprocessing的平台上的任何现有Python上都可以正常工作:

Here's fleshing out a portable, clean solution, assuming the workers are going to keep going until at least one finds a value > 0.7. Note that I removed numpy from this, because it's irrelevant to this code. The code here should work fine under any stock Python on any platform supporting multiprocessing:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

还有一些示例输出:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

一切都正常关闭:没有回溯,没有异常终止,没有留下僵尸进程……像哨子一样干净.

Everything shuts down cleanly: no tracebacks, no abnormal terminations, no zombie processes left behind ... clean as a whistle.

正如@noxdafox指出的那样,有一个Pool.terminate()方法可以在所有平台上尽其所能来杀死工作进程,无论它们在做什么(例如,在Windows上称为平台TerminateProcess()).我不建议将其用于生产代码,因为突然终止进程可能会使各种共享资源处于不一致状态,或者让它们泄漏. multiprocessing文档中有关于此的各种警告,您应在其中添加OS文档.

As @noxdafox pointed at, there's a Pool.terminate() method that does the best it can, across platforms, to kill worker processes no matter what they're doing (e.g., on Windows it calls the platform TerminateProcess()). I don't recommend it for production code, because killing a process abruptly can leave various shared resources in inconsistent states, or let them leak. There are various warnings about that in the multiprocessing docs, to which you should add your OS docs.

还是可以的!这是使用此方法的完整程序.请注意,我将临界值提高到0.95,以使其花费的时间比眨眼的时间更长:

Still, it can be expedient! Here's a full program using this approach. Note that I bumped the cutoff to 0.95, to make this more likely to take longer than an eyeblink to run:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

还有一些示例输出:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

这篇关于一旦其中一个工人满足特定条件,终止Python多处理程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆