一旦任何一个进程在python中找到匹配项,如何使所有pool.apply_async进程停止 [英] How to get all pool.apply_async processes to stop once any one process has found a match in python

查看:77
本文介绍了一旦任何一个进程在python中找到匹配项,如何使所有pool.apply_async进程停止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码,这些代码利用多重处理来遍历大列表并找到匹配项.在任何一个进程中找到匹配项后,如何使所有进程停止?我已经看到了示例,但是我似乎都不适合我在这里所做的事情.

I have the following code that is leveraging multiprocessing to iterate through a large list and find a match. How can I get all processes to stop once a match is found in any one processes? I have seen examples but I none of them seem to fit into what I am doing here.

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts

def do_job(first_bits):
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        # CHECK FOR MATCH HERE
        print(''.join(x))
        # EXIT ALL PROCESSES IF MATCH FOUND

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = []

    for i in range(num_parts):
        if i == num_parts - 1:
            first_bit = alphabet[part_size * i :]
        else:
            first_bit = alphabet[part_size * i : part_size * (i+1)]
        pool.apply_async(do_job, (first_bit,))

    pool.close()
    pool.join()

感谢您的时间.

更新1:

我已经实现了@ShadowRanger伟大方法中建议的更改,并且几乎可以按照我想要的方式工作.因此,我添加了一些日志记录以指示进度,并在其中放置一个测试"键以进行匹配. 我希望能够独立于num_parts来增加/减少iNumberOfProcessors.在这个阶段,当我让它们都达到4时,一切都按预期工作,则4个进程开始旋转(控制台额外增加了一个).当我将iNumberOfProcessors更改为6时,有6个进程旋转,但只有其中一个具有CPU使用率.如此看来2是闲置的.在上面的解决方案中,我能够在不增加num_parts的情况下设置更高的内核数,并且所有进程都会被使用.

I have implemented the changes suggested in the great approach by @ShadowRanger and it is nearly working the way I want it to. So I have added some logging to give an indication of progress and put a 'test' key in there to match. I want to be able to increase/decrease the iNumberOfProcessors independently of the num_parts. At this stage when I have them both at 4 everything works as expected, 4 processes spin up (one extra for the console). When I change the iNumberOfProcessors = 6, 6 processes spin up but only for of them have any CPU usage. So it appears 2 are idle. Where as my previous solution above, I was able to set the number of cores higher without increasing the num_parts, and all of the processes would get used.

我不确定如何重构这种新方法来为我提供相同的功能.您能否看一下并为我提供一些重构的方向,以便能够彼此独立地设置iNumberOfProcessors和num_parts并仍然使用所有进程?

I am not sure about how to refactor this new approach to give me the same functionality. Can you have a look and give me some direction with the refactoring needed to be able to set iNumberOfProcessors and num_parts independently from each other and still have all processes used?

这是更新的代码:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6

def do_job(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

if __name__ == '__main__':
    # with statement with Py3 multiprocessing.Pool terminates when block exits
    with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:

        # Don't need special case for final block; slices can 
        for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
             if gotmatch:
                 break
        else:
             print("No matches found")

更新2:

好的,这是我尝试@noxdafox建议的尝试.根据他提供的建议,我整理了以下内容.不幸的是,当我运行它时,我得到了错误:

Ok here is my attempt at trying @noxdafox suggestion. I have put together the following based on the link he provided with his suggestion. Unfortunately when I run it I get the error:

... apply_async中的第322行 引发ValueError(池未运行") ValueError:池未运行

... line 322, in apply_async raise ValueError("Pool not running") ValueError: Pool not running

任何人都可以给我一些有关如何使其正常工作的指导.

Can anyone give me some direction on how to get this working.

基本上,问题是我的第一次尝试进行了多处理,但不支持在找到匹配项后取消所有进程.

Basically the issue is that my first attempt did multiprocessing but did not support canceling all processes once a match was found.

我的第二次尝试(基于@ShadowRanger的建议)解决了该问题,但是打破了能够独立扩展进程数和num_parts大小的功能,这是我的第一次尝试.

My second attempt (based on @ShadowRanger suggestion) solved that problem, but broke the functionality of being able to scale the number of processes and num_parts size independently, which is something my first attempt could do.

我的第三次尝试(基于@noxdafox的建议)抛出上面概述的错误.

My third attempt (based on @noxdafox suggestion), throws the error outlined above.

如果任何人都可以给我一些指导,说明如何维护我的第一次尝试的功能(能够独立扩展进程的数量和num_parts的大小),并添加一旦发现匹配项便取消所有进程的功能,那么它将非常感谢.

If anyone can give me some direction on how to maintain the functionality of my first attempt (being able to scale the number of processes and num_parts size independently), and add the functionality of canceling all processes once a match was found it would be much appreciated.

谢谢您的时间.

这是我基于@noxdafox建议的第三次尝试的代码:

Here is the code from my third attempt based on @noxdafox suggestion:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4


def find_match(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

class Worker():

    def __init__(self, workers):
        self.workers = workers

    def callback(self, result):
        if result:
            self.pool.terminate()

    def do_job(self):
        print(self.workers)
        pool = multiprocessing.Pool(processes=self.workers)
        for part in grouper(alphabet, part_size):
            pool.apply_async(do_job, (part,), callback=self.callback)
        pool.close()
        pool.join()
        print("All Jobs Queued")

if __name__ == '__main__':
    w = Worker(4)
    w.do_job()

推荐答案

您可以检查

You can check this question to see an implementation example solving your problem.

这也适用于current.futures池.

This works also with concurrent.futures pool.

只需将map方法替换为apply_async,然后从调用者处遍历您的列表即可.

Just replace the map method with apply_async and iterated over your list from the caller.

类似这样的东西.

for part in grouper(alphabet, part_size):
    pool.apply_async(do_job, part, callback=self.callback)

石斑鱼食谱

这篇关于一旦任何一个进程在python中找到匹配项,如何使所有pool.apply_async进程停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆