带 while 循环的基本多处理 [英] Basic multiprocessing with while loop

查看:22
本文介绍了带 while 循环的基本多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 python 中的 multiprocessing 包是全新的,我的困惑可能很容易让知道更多的人澄清.我一直在阅读有关并发的文章,并搜索了其他类似的问题,但一无所获.(仅供参考,我不想想使用多线程,因为 GIL 会大大减慢我的应用程序的速度.)

I am brand new to the multiprocessing package in python and my confusion will probably be easy for someone who knows more to clear up. I've been reading about concurrency and have searched for other questions like this and have found nothing. (FYI I do NOT want to use multithreading because the GIL will slow down my application a lot.)

我在事件的框架内思考.我想让多个进程运行,等待事件发生.如果事件发生,它会被分配给一个特定的进程,该进程运行然后返回到其空闲状态.可能有更好的方法来做到这一点,但我的推理是我应该生成所有进程一次并让它们无限期地打开,而不是每次发生事件时创建然后关闭进程.速度对我来说是一个问题,我的事件每秒可能发生数千次.

I am thinking in the framework of events. I want to have multiple processes running, waiting for an event to happen. If the event happens, it gets assigned to a particular process, which operates and then returns to its idle state. There may be a better way to do this, but my reasoning is that I should spawn all the processes once and keep them open indefinitely, rather than creating then closing a process every time an event happens. Speed is an issue for me and my events can occur many thousands of times per second.

我想出了以下玩具示例,该示例旨在将偶数发送到一个进程,将奇数发送到另一个进程.两个进程是相同的,它们只是将数字附加到列表中.

I came up with the following toy example, which is meant to send even numbers to one process, and odd numbers to another. Both processes are the same, they just append the number to a list.

from multiprocessing import Process, Queue, Pipe

slist=['even','odd']

Q={}
Q['even'] = Queue()
Q['odd'] = Queue()

ev,od = [],[]

Q['even'].put(ev)
Q['odd'].put(od)

P={}
P['even'] = Pipe()
P['odd'] = Pipe()



def add_num(s):
    """ The worker function, invoked in a process. The results are placed in
        a list that's pushed to a queue."""
#    while True :
    if not P[s][1].recv():
        print s,'- do nothing'

    else:            
        d = Q[s].get()
        print d
        d.append(P[s][1].recv())
        Q[s].put(d)
        print Q[s].get()
        P[s][0].send(False)
        print 'ya'




def piper(s,n):

    P[s][0].send(n)    
    for k in [S for S in slist if S != s]:
        P[k][0].send(False) 
    add_num(s)


procs = [ Process (
                   target=add_num,
                   args=(i,)
                   ) 
         for i in ['even','odd']]

for s in slist: 
    P[s][0].send(False)

for p in procs:
    p.start()  
    p.join()

for i in range(10):
    print i
    if i%2==0:
        s = 'even'
    else:
        s = 'odd'
    piper(s,i)


print 'results:', Q['odd'].get(),Q['even'].get()

此代码产生以下内容:

甚至 - 什么都不做

任何聪明人对此问题的见解,我的代码或推理不足等,将不胜感激.

Any insight from the wise into this problem, where my code or reasoning falls short etc. would be greatly appreciated.

推荐答案

以下是我使用过几次并取得成功的方法:

Here is an approach I've used a couple of times with good success:

  1. 启动多处理池.

使用多处理 SyncManager创建多个队列(针对需要以不同方式处理的每种类型的数据一个).

Use a multiprocessing SyncManager to create multiple queues (one for each type of data that needs to be handled differently).

使用 apply_async启动处理数据的函数.就像队列一样,对于需要不同处理的每种类型的数据,应该有一个函数.每个启动的函数都获取与其数据对应的队列作为输入参数.这些函数将在无限循环中完成工作,从队列中获取数据开始.

Use apply_async to launch the functions that process data. Just like the queues, there should be one function for each type of data that needs to be processed differently. Each function launched gets the queue that corresponds to its data as an input argument. The functions will do their work in an infinite loop that starts by getting data from the queue.

开始处理.在处理过程中,主进程对数据进行排序并决定应该由哪个函数来处理它.一旦做出决定,数据就会被放入与该函数对应的队列中.

Begin processing. During processing, the main process sorts the data and decides which function should be handling it. Once the decision is made, the data is placed on the queue that corresponds to that function.

处理完所有数据后,主进程将一个称为毒丸"的值放入每个队列.毒丸是工作进程都识别为退出信号的值.由于队列是先进先出 (FIFO),因此它们可以保证将毒丸作为队列中的最后一项.

After all data has been handled, the main process puts a value called a "poison pill" into each queue. The poison pill is a value that the worker processes all recognize as a signal to exit. Since the queues are first-in first-out (FIFO), then they are guaranteed to pull the poison pill as the last item in the queues.

关闭并加入多处理池.

代码

下面是这个算法的一个例子.示例代码的目标是使用前面描述的算法将奇数除以 2,将偶数除以 -2.所有结果都放置在主进程可访问的共享列表中.

Code

Below is an example of this algorithm. The example code's goal is to use the algorithm previously described to divide odd numbers by 2, and even numbers by -2. All results are placed in a shared list accessible by the main process.

import multiprocessing

POISON_PILL = "STOP"

def process_odds(in_queue, shared_list):

    while True:

        # block until something is placed on the queue
        new_value = in_queue.get() 

        # check to see if we just got the poison pill
        if new_value == POISON_PILL:
            break

        # we didn't, so do the processing and put the result in the
        # shared data structure
        shared_list.append(new_value/2)

    return

def process_evens(in_queue, shared_list):

    while True:    
        new_value = in_queue.get() 
        if new_value == POISON_PILL:
            break

        shared_list.append(new_value/-2)

    return

def main():

    # create a manager - it lets us share native Python object types like
    # lists and dictionaries without worrying about synchronization - 
    # the manager will take care of it
    manager = multiprocessing.Manager()

    # now using the manager, create our shared data structures
    odd_queue = manager.Queue()
    even_queue = manager.Queue()
    shared_list = manager.list()

    # lastly, create our pool of workers - this spawns the processes, 
    # but they don't start actually doing anything yet
    pool = multiprocessing.Pool()

    # now we'll assign two functions to the pool for them to run - 
    # one to handle even numbers, one to handle odd numbers
    odd_result = pool.apply_async(process_odds, (odd_queue, shared_list))
    even_result = pool.apply_async(process_evens, (even_queue, shared_list))
    # this code doesn't do anything with the odd_result and even_result
    # variables, but you have the flexibility to check exit codes
    # and other such things if you want - see docs for AsyncResult objects

    # now that the processes are running and waiting for their queues
    # to have something, lets give them some work to do by iterating
    # over our data, deciding who should process it, and putting it in
    # their queue
    for i in range(6):

        if (i % 2) == 0: # use mod operator to see if "i" is even
            even_queue.put(i)

        else:
            odd_queue.put(i)

    # now we've finished giving the processes their work, so send the 
    # poison pill to tell them to exit
    even_queue.put(POISON_PILL)
    odd_queue.put(POISON_PILL)

    # wait for them to exit
    pool.close()
    pool.join()

    # now we can check the results
    print(shared_list)

    # ...and exit!
    return


if __name__ == "__main__":
    main()

输出

此代码产生此输出:

Output

This code produces this output:

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

请注意,结果的顺序是不可预测的,因为我们无法保证函数能够以什么顺序从它们的队列中获取项目并将结果放入列表中.但是您当然可以进行任何您需要的后期处理,其中可能包括排序.

Notice that the order of the results is unpredictable, because we can't guarantee in what order the functions will be able to get items from their queues and put the results into the list. But you can certainly to whatever post-processing you need, which could include sorting.

我认为这可以很好地解决您的问题,因为:

I think this would be a good solution to your issue because:

  1. 您说得对,生成进程的开销很大.这种单生产者/多消费者方法消除了当您使用池在整个计划期间保持工作人员存活时的情况.

  1. You're correct that there is huge overhead to spawning processes. This single-producer/multiple-consumer approach eliminates that when you use a pool to keep workers alive for the entire duration of the program.

它解决了您对能够根据数据的属性以不同方式处理数据的担忧.在您的评论中,您表达了对能够将数据发送到特定流程的担忧.在这种方法中,您可以选择将数据提供给哪些进程,因为您必须选择将数据放入哪个队列.(顺便说一句,我认为您正在考虑 pool.map 函数,正如您正确相信的那样,它不允许您在同一作业中执行不同的操作.apply_async 确实如此.)

It addresses your concerns about being able to handle data differently depending on attributes of the data. In your comments, you expressed concerns about being able to send data to specific processes. In this approach, you can choose which processes to give data to, because you have to choose which queue to put it on. (By the way, I think you're thinking of the pool.map function, which, as you correctly believe, doesn't allow you to perform different operations in the same job. apply_async does.)

我发现它非常可扩展和灵活.需要添加更多类型的数据处理?只需编写您的处理程序函数,再添加一个队列,并将逻辑添加到 main 以将数据路由到您的新函数.您是否发现一个队列正在备份并成为瓶颈?您可以使用相同的目标函数调用 apply_async 并多次排队以使多个工作人员在同一队列上工作.只要确保你给队列足够的毒丸,让所有的工人都得到一个.

I've found it to be very expandable and flexible. Need to add more types of data handling? Just write your handler function, add one more queue, and add to logic to main to route the data to your new function. Are you finding that one queue is getting backed up and becoming a bottleneck? You can call apply_async with the same target function and queue multiple times to get multiple workers working on the same queue. Just make sure you give the queue enough poison pills so that all of the workers get one.

限制

任何你想在队列中传递的数据都必须是pickle模块的pickleable(serializable).查看这里看看有什么可以也不能腌制.

Limitations

Any data you want to pass on a queue must be picklable (serializable) by the pickle module. Look here to see what can and can't be pickled.

可能还有其他限制,但我想不出任何其他限制.

There are probably other limitations as well, but I can't think of any others off of the top of my head.

这篇关于带 while 循环的基本多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆