python多处理问题 [英] python multiprocessing issues

查看:104
本文介绍了python多处理问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用进程和队列时出现一些问题.

I'm having a few issues crop up when using processes and queues.

当我运行以下代码时,目标函数只需从主队列中获取一项并将其添加到该进程专用的另一个队列中即可.

When I run the following code the target function simply gets an item from a master queue and adds it to another queue specific to that process.

import sys
import multiprocessing
from Queue import Empty

# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
    while True:
        try:
            tmp = from_queue.get(0)
            to_queue.put(tmp)
            print to_queue
        except Empty:
            break

# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
    main_queue.put(i)

all_queues = []
processes = []
# Create processes
for i in xrange(5):
    # Each process gets a queue that it will put numbers into
    queue = multiprocessing.Queue()
    # Keep up with the queue we are creating so we can get it later
    all_queues.append(queue)
    # Pass in our master queue and the queue we are transferring data to
    process = multiprocessing.Process(target=my_callable,
                                      args=(main_queue, queue))
    # Keep up with the processes
    processes.append(process)

for thread in processes:
    thread.start()

for thread in processes:
    thread.join()

当目标函数打印正在使用的队列时,您会注意到几乎只使用了一个队列.

When the target function prints the queue being used, you'll notice that one queue is used almost exclusively.

如果您随后获取输出并进行打印,则会看到大多数数字最终都在一个队列中.

If you then take the output and print it, you'll see that most of the numbers end up under a single queue.

def queue_get_all(q):
   items = []
   maxItemsToRetreive = 100
   for numOfItemsRetrieved in range(0, maxItemsToRetreive):
       try:
           if numOfItemsRetrieved == maxItemsToRetreive:
               break
           items.append(q.get_nowait())
       except Empty, e:
           break
   return items

for tmp in all_queues:
    print queue_get_all(tmp)

是什么原因造成的?我应该在代码中执行哪些操作,以使这些进程正在执行的工作均匀化?

What is causing this? Is there something in my code I should be doing that will even out the work these processes are doing?

输出

[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]

推荐答案

我认为您在这里有两个问题:

I think you have two problems here:

def my_callable(from_queue, to_queue):
    while True:
        try:
            tmp = from_queue.get(0)
            to_queue.put(tmp)
            print to_queue
        except Empty:
            break

从文档中获取获取:

From the docs for get:

从队列中删除并返回一个项目.如果可选的args块为True(默认值)且超时为None(默认值),则必要时进行阻塞,直到有可用项为止.如果timeout是一个正数,则它最多将阻止timeout秒,并在该时间内没有可用项时引发Queue.Empty异常.否则(块为False),如果有一个立即可用,则返回一个项目,否则引发Queue.Empty异常(在这种情况下,超时将被忽略).

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).

由于您将0作为第一个参数传递,因此它等效于get(False).这使其成为非阻塞的,这意味着如果无法立即获取值,它将引发Empty异常,这将终止您的工作进程.由于您所有的工作"功能都是相同的,并且会尝试同时从主队列中拉出,因此某些功能可能无法立即获得价值,并且会死掉.

Since you are passing 0 as the first parameter, it's equivalent to get(False). This makes it non-blocking, which means if it can't get a value out immediately it will raise an Empty exception, which will end your worker process. Since all your 'work' functions are identical and try to pull from the main queue at the same time, some might not be able to get a value right away and will die.

给予.get()较小的超时应该可以解决此问题.

Giving the .get() a small timeout should fix this problem.

第二个问题是您的工作"功能基本上需要零时间才能完成.在sleep(.2)处稍作停顿,以模拟一些非平凡的工作,它将分布在工作人员中:

The second problem is that your 'work' function takes basically zero time to complete. Give it a little pause with sleep(.2) to simulate some non-trival work and it will distribute across the workers:

def my_callable(from_queue, to_queue):
    while True:
        try:
            tmp = from_queue.get(True, .1)
            sleep(0.2)
            to_queue.put(tmp)
        except Empty:
            break

我忘了说了,通常对于这种类型的问题,最好不要依靠.get()的超时来发出队列结束的信号.如果使用某种类型的队列结束"标记对象传递给队列,该标记对象告诉工作人员该该退出了,那么您将获得更多的控制权.这样,您可以将它们全部阻止,等待新输入或退出命令".

I forgot to say, generally it is better for this type of problem to not rely on the timeout of .get() to signal the end of the queue. You get more control if you use some type of "end of queue" marker object that you pass into the queue that tells the workers it is time to quit. This way you can have them all block, waiting for either new input or an exit "command".

这篇关于python多处理问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆